Addax 介绍#
Addax 介绍#
概览#
Addax 是一个异构数据源离线同步工具(最初来源于阿里的 DataX ,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
addax_why_new
为了解决异构数据源同步问题,Addax将复杂的网状的同步链路变成了星型数据链路,Addax作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到Addax,便能跟已有的数据源做到无缝数据同步。
框架设计#
addax_framework_new
Addax本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。
Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer 为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
Addax Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。
核心架构#
本小节按一个Addax作业生命周期的时序图,从整体架构设计非常简要说明各个模块相互关系。
addax_arch
核心模块介绍:#
Addax 完成单个数据同步的作业,我们称之为Job,Adda x接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。Addax Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子 Task)、TaskGroup 管理等功能。
Addax Job 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task (子任务),以便于并发执行。Task 便是 Addax 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。
切分多个 Task 之后,Addax Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为5。
每一个 Task 都由 TaskGroup 负责启动,Task 启动后,会固定启动
Reader—>Channel—>Writer
的线程来完成任务同步工作。Addax 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非0
调度流程:#
举例来说,用户提交了一个作业,并且配置了 20 个并发,目的是将一个100张分表的mysql数据同步到oracle里面。 调度决策思路是:
Addax Job 根据分库分表切分成了 100 个 Task。
根据 20 个并发,计算共需要分配
20/5 = 4
个 TaskGroup。4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以5个并发共计运行25个 Task。
核心优势#
可靠的数据质量监控#
完美解决数据传输个别类型失真问题
支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
提供作业全链路的流量、数据量运行时监控
运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。
提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据 Addax 认为就是脏数据。Addax 目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!
丰富的数据转换功能#
作为一个服务于大数据的 ETL 工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动 groovy
函数,让用户自定义转换函数。详情请看 transformer 详细介绍。
精准的速度控制#
提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。
{
"speed": {
"channel": 5,
"byte": 1048576,
"record": 10000
}
}
强劲地同步性能#
每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让速度随并发成线性增长。 在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。
健壮的容错机制#
作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是 Addax 的基本要求,在 Addax 的设计中,重点完善了框架和插件的稳定性。 目前 Addax 可以做到线程级别、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
快速使用#
安装 Addax#
如果你不想编译,你可以执行下面的命令,直接从下载已经编译好的二进制文件
curl -sS -o addax-4.0.2.tar.gz https://github.com/wgzhao/Addax/releases/download/4.0.2/addax-4.0.2.tar.gz`
tar -xzf addax-4.0.2.tar.gz
cd addax-4.0.2
或者你可以自行下载源代码进行编译
git clone https://github.com/wgzhao/addax.git
cd addax
git checkout 4.0.2
mvn clean package -pl '!:addax-docs'
mvn package assembly:single
cd target/addax/addax-4.0.2
开始第一个采集任务#
要使用 Addax
进行数据采集,只需要编写一个任务采集文件,该文件为 JSON 格式,以下是一个简单的配置文件,该任务的目的是从内存读取读取指定内容的数据,并将其打印出来。
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "addax",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
将上述文件保存为 job/test.json
然后执行下面的命令:
bin/addax.sh job/test.json
如果没有报错,应该会有类似这样的输出
___ _ _
/ _ \ | | | |
/ /_\ \ __| | __| | __ ___ __
| _ |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |> <
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version :: (v4.0.3-SNAPSHOT)
2021-08-23 13:45:17.199 [ main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-08-23 13:45:17.223 [ main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"column":[
{
"type":"string",
"value":"addax"
},
{
"type":"long",
"value":19890604
},
{
"type":"date",
"value":"1989-06-04 00:00:00"
},
{
"type":"bool",
"value":true
}
],
"sliceRecordCount":10
},
"name":"streamreader"
},
"writer":{
"parameter":{
"print":true
},
"name":"streamwriter"
}
}
],
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"byte":-1,
"channel":1
}
}
}
2021-08-23 13:45:17.238 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-23 13:45:17.239 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-23 13:45:17.240 [ main] INFO JobContainer - Set jobId = 0
2021-08-23 13:45:17.250 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do prepare work .
2021-08-23 13:45:17.250 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do prepare work .
2021-08-23 13:45:17.251 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-23 13:45:17.251 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2021-08-23 13:45:17.252 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-23 13:45:17.276 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-23 13:45:17.282 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-08-23 13:45:17.287 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-23 13:45:17.288 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
2021-08-23 13:45:20.295 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-23 13:45:20.296 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-08-23 13:45:20.297 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do post work.
2021-08-23 13:45:20.302 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-23 13:45:20.305 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 220 bytes | Speed 73B/s, 3 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.011s | Percentage 100.00%
2021-08-23 13:45:20.307 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-23 13:45:17
任务结束时刻 : 2021-08-23 13:45:20
任务总计耗时 : 3s
任务平均流量 : 73B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0
接下来,你可以继续了解如何配置一个采集任务文件
任务配置#
一个采集任务就是一个 JSON 格式配置文件,该配置文件的模板如下:
{
"job": {
"settings": {}
"content": [
{
"reader": {},
"writer": {}
"transformer": []
}
]
}
}
任务配置由 key 为 job
的字典组成,其字典元素由三部分组成:
reader
: 用来配置数据读取所需要的相关信息,这是必填内容writer
: 用来配置写入数据所需要的相关信息,这是必填内容transformer
: 数据转换规则,如果需要对读取的数据在写入之前做一些变换,可以配置该项,否则可以不配置。
读取插件#
本章描述 Addax 目前支持的数据读取插件
Cassandra Reader#
CassandraReader
插件实现了从 Cassandra 读取数据。
配置#
下面是配置一个从 Cassandra 读取数据到终端的例子
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "cassandrareader",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "test",
"table": "addax_src",
"column": [
"textCol",
"blobCol",
"writetime(blobCol)",
"boolCol",
"smallintCol",
"tinyintCol",
"intCol",
"bigintCol",
"varintCol",
"floatCol",
"doubleCol",
"decimalCol",
"dateCol",
"timeCol",
"timeStampCol",
"uuidCol",
"inetCol",
"durationCol",
"listCol",
"mapCol",
"setCol",
"tupleCol",
"udtCol"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
参数说明#
parameter
配置项支持以下配置
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
host | 是 | 无 | Cassandra连接点的域名或ip,多个node之间用逗号分隔 |
port | 是 | 9042 | Cassandra端口 |
username | 否 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
useSSL | 否 | false | 是否使用SSL连接 |
keyspace | 是 | 无 | 需要同步的表所在的keyspace |
table | 是 | 无 | 所选取的需要同步的表 |
column | 是 | 无 | 所配置的表中需要同步的列集合,其中的元素可以指定列的名称或 writetime(column_name) ,后一种形式会读取column_name 列的时间戳而不是数据 |
where | 否 | 无 | 数据筛选条件的 cql 表达式 |
allowFiltering | 否 | 无 | 是否在服务端过滤数据,详细描述参考官方文档的相关描述 |
consistancyLevel | 否 | LOCAL_QUORUM | 数据一致性级别, 可选 ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY, TWO, THREE, LOCAL_ONE |
支持的数据类型#
目前支持除 counter
和 Custom
类型之外的所有类型。
下面列出 CassandraReader
针对 Cassandra
类型转换列表:
Addax 内部类型 | Cassandra 数据类型 |
---|---|
Long | int, tinyint, smallint,varint,bigint,time |
Double | float, double, decimal |
String | ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
Date | date, timestamp |
Boolean | bool |
Bytes | blob |
ClickHouse Reader#
ClickHouseReader
插件支持从 ClickHouse数据库读取数据。
示例#
表结构及数据信息#
假定需要的读取的表的结构以及数据如下:
CREATE TABLE ck_addax (
c_int8 Int8,
c_int16 Int16,
c_int32 Int32,
c_int64 Int64,
c_uint8 UInt8,
c_uint16 UInt16,
c_uint32 UInt32,
c_uint64 UInt64,
c_float32 Float32,
c_float64 Float64,
c_decimal Decimal(38,10),
c_string String,
c_fixstr FixedString(36),
c_uuid UUID,
c_date Date,
c_datetime DateTime('Asia/Chongqing'),
c_datetime64 DateTime64(3, 'Asia/Chongqing'),
c_enum Enum('hello' = 1, 'world'=2)
) ENGINE = MergeTree() ORDER BY (c_int8, c_int16) SETTINGS index_granularity = 8192;
insert into ck_addax values(
127,
-32768,
2147483647,
-9223372036854775808,
255,
65535,
4294967295,
18446744073709551615,
0.999999999999,
0.99999999999999999,
1234567891234567891234567891.1234567891,
'Hello String',
'2c:16:db:a3:3a:4f',
'5F042A36-5B0C-4F71-ADFD-4DF4FCA1B863',
'2021-01-01',
'2021-01-01 00:00:00',
'2021-01-01 00:00:00',
'hello'
);
配置 json 文件#
下面的配置文件表示从 ClickHouse 数据库读取指定的表数据并打印到终端
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"connection": [
{
"table": [
"ck_addax"
],
"jdbcUrl": [
"jdbc:clickhouse://127.0.0.1:8123/default"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
将上述配置文件保存为 job/clickhouse2stream.json
执行采集命令#
执行以下命令进行数据采集
bin/addax.sh job/clickhouse2stream.json
其输出信息如下(删除了非关键信息)
2021-01-06 14:39:35.742 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-01-06 14:39:35.767 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"column":[
"*"
],
"connection":[
{
"jdbcUrl":[
"jdbc:clickhouse://127.0.0.1:8123/"
],
"table":[
"ck_addax"
]
}
],
"username":"default"
},
"name":"clickhousereader"
},
"writer":{
"parameter":{
"print":true
},
"name":"streamwriter"
}
}
],
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"channel":3
}
}
}
127 -32768 2147483647 -9223372036854775808 255 65535 4294967295 18446744073709551615 1 1 1234567891234567891234567891.1234567891Hello String 2c:16:db:a3:3a:4f
5f042a36-5b0c-4f71-adfd-4df4fca1b863 2021-01-01 2021-01-01 00:00:00 2021-01-01 00:00:00 hello
任务启动时刻 : 2021-01-06 14:39:35
任务结束时刻 : 2021-01-06 14:39:39
任务总计耗时 : 3s
任务平均流量 : 77B/s
记录写入速度 : 0rec/s
读出记录总数 : 1
读写失败总数 : 0
参数说明#
parameter
配置项支持以下配置
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | array | 无 | ClickHouse JDBC 连接信息 ,可按照官方规范填写连接附件控制信息。具体请参看ClickHouse官方文档 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
table | 是 | array | 无 | 所选取的需要同步的表 ,当配置为多张表时,用户自己需保证多张表是同一schema结构 |
column | 是 | array | 无 | 所配置的表中需要同步的列名集合, 使用JSON的数组描述字段信息。用户使用 * 代表默认使用所有列配置,例如 ["*"] |
splitPk | 否 | string | 无 | 希望使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能 |
autoPk | 否 | bool | false | 是否自动猜测分片主键,3.2.6 版本引入 |
where | 否 | string | 无 | 筛选条件 |
querySql | 否 | array | 无 | 使用SQL查询而不是直接指定表的方式读取数据,当用户配置querySql时,ClickHouseReader直接忽略table、column、where条件的配置 |
支持的数据类型#
目前ClickHouseReader支持大部分ClickHouse类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出ClickHouseReader针对ClickHouse类型转换列表:
Addax 内部类型 | ClickHouse 数据类型 |
---|---|
Long | Uint8, Uint16, Uint32, Uint64, Int8, Int16, Int32, Int64, Enum8, Enum16 |
Double | Float32, Float64, Decimal |
String | String, FixedString(N) |
Date | Date, DateTime, DateTime64 |
Boolean | UInt8 |
Bytes | String |
限制#
除上述罗列字段类型外,其他类型均不支持,如Array、Nested等
Data Reader#
DataReader
插件是专门提供用于开发和测试环境中,生产满足一定规则要求的数据的插件。
在实际开发和测试中,我们需要按照一定的业务规则来生产测试数据,而不仅仅是随机内容,比如身份证号码,银行账号,股票代码等。
为什么要重复发明轮子#
诚然,网络上有相当多的专门的数据生产工具,其中不乏功能强大、性能也强悍。 但这些工具大部分是考虑到了数据生成这一段,而忽略了数据写入到目标端的问题,或者说有些考虑到了,但仅仅只考虑了一种或有限的几种数据库。
恰好 Addax 工具能够提供足够多的目标端写入能力,加上之前的已有的 streamReader 已经算是一个简单版的数据生成工具,因此在此功能上 增加一些特定规则,再利用写入端多样性的能力,自然就成为了一个较好的数据生成工具。
配置示例#
这里我把目前插件支持的规则全部列举到下面的例子中
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "datareader",
"parameter": {
"column": [
{
"value": "1,100,",
"rule": "random",
"type": "double"
},
{
"value": "DataX",
"type": "string"
},
{
"value": "1",
"rule": "incr",
"type": "long"
},
{
"value": "1989/06/04 00:00:01,-1",
"rule": "incr",
"type": "date",
"dateFormat": "yyyy/MM/dd hh:mm:ss"
},
{
"value": "test",
"type": "bytes"
},
{
"rule": "address"
},
{
"rule": "bank"
},
{
"rule": "company"
},
{
"rule": "creditCard"
},
{
"rule": "debitCard"
},
{
"rule": "idCard"
},
{
"rule": "lat"
},
{
"rule": "lng"
},
{
"rule": "name"
},
{
"rule": "job"
},
{
"rule": "phone"
},
{
"rule": "stockCode"
},
{
"rule": "stockAccount"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
保存上述内容到 job/datareader2stream.json
然后执行该任务,其输出结果类似如下:
$ bin/addax.sh job/datareader2stream.json
___ _ _
/ _ \ | | | |
/ /_\ \ __| | __| | __ ___ __
| _ |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |> <
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version :: (v4.0.2-SNAPSHOT)
2021-08-13 17:02:00.888 [ main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-08-13 17:02:00.910 [ main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"column":[
{
"rule":"random",
"type":"double",
"scale": "2",
"value":"1,100,"
},
{
"type":"string",
"value":"DataX"
},
{
"rule":"incr",
"type":"long",
"value":"1"
},
{
"dateFormat":"yyyy/MM/dd hh:mm:ss",
"rule":"incr",
"type":"date",
"value":"1989/06/04 00:00:01,-1"
},
{
"type":"bytes",
"value":"test"
},
{
"rule":"address"
},
{
"rule":"bank"
},
{
"rule":"company"
},
{
"rule":"creditCard"
},
{
"rule":"debitCard"
},
{
"rule":"idCard"
},
{
"rule":"lat"
},
{
"rule":"lng"
},
{
"rule":"name"
},
{
"rule":"job"
},
{
"rule":"phone"
},
{
"rule":"stockCode"
},
{
"rule":"stockAccount"
}
],
"sliceRecordCount":10
},
"name":"datareader"
},
"writer":{
"parameter":{
"print":true,
"encoding":"UTF-8"
},
"name":"streamwriter"
}
}
],
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"byte":-1,
"channel":1
}
}
}
2021-08-13 17:02:00.937 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-13 17:02:00.938 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-13 17:02:00.940 [ main] INFO JobContainer - Set jobId = 0
2021-08-13 17:02:00.976 [ job-0] INFO JobContainer - Addax Reader.Job [datareader] do prepare work .
2021-08-13 17:02:00.977 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do prepare work .
2021-08-13 17:02:00.978 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-13 17:02:00.979 [ job-0] INFO JobContainer - Addax Reader.Job [datareader] splits to [1] tasks.
2021-08-13 17:02:00.980 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-13 17:02:01.002 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-13 17:02:01.009 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-08-13 17:02:01.017 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-13 17:02:01.017 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
7.65 DataX 1 1989-06-04 00:00:01 test 天津市南京县长寿区光明路263号 交通银行 易动力信息有限公司 6227894836568607 6235712610856305437 450304194808316766 31.3732613 -125.3507716 龚军 机电工程师 13438631667 726929 8741848665
18.58 DataX 2 1989-06-03 00:00:01 test 江苏省太原市浔阳区东山路33号 中国银行 时空盒数字信息有限公司 4096666711928233 6217419359154239015 220301200008188547 48.6648764 104.8567048 匡飞 化妆师 18093137306 006845 1815787371
16.16 DataX 3 1989-06-02 00:00:01 test 台湾省邯郸市清河区万顺路10号 大同商行 开发区世创科技有限公司 4096713966912225 6212977716107080594 150223196408276322 29.0134395 142.6426842 支波 审核员 13013458079 020695 3545552026
63.89 DataX 4 1989-06-01 00:00:01 test 上海市辛集县六枝特区甘园路119号 中国农业银行 泰麒麟传媒有限公司 6227893481508780 6215686558778997167 220822196208286838 -71.6484635 111.8181273 敬坤 房地产客服 13384928291 174445 0799668655
79.18 DataX 5 1989-05-31 00:00:01 test 陕西省南京市朝阳区大胜路170号 内蒙古银行 晖来计算机信息有限公司 6227535683896707 6217255315590053833 350600198508222018 -24.9783587 78.017024 蒋杨 固定资产会计 18766298716 402188 9633759917
14.97 DataX 6 1989-05-30 00:00:01 test 海南省长春县璧山区碧海街147号 华夏银行 浙大万朋科技有限公司 6224797475369912 6215680436662199846 220122199608190275 -3.5088667 -40.2634359 边杨 督导/巡店 13278765923 092780 2408887582
45.49 DataX 7 1989-05-29 00:00:01 test 台湾省潜江县梁平区七星街201号 晋城商行 开发区世创信息有限公司 5257468530819766 6213336008535546044 141082197908244004 -72.9200596 120.6018163 桑明 系统工程师 13853379719 175864 8303448618
8.45 DataX 8 1989-05-28 00:00:01 test 海南省杭州县城北区天兴路11号 大同商行 万迅电脑科技有限公司 6227639043120062 6270259717880740332 430405198908214042 -16.5115338 -39.336119 覃健 人事总监 13950216061 687461 0216734574
15.01 DataX 9 1989-05-27 00:00:01 test 云南省惠州市和平区海鸥街201号 内蒙古银行 黄石金承信息有限公司 6200358843233005 6235730928871528500 130300195008312067 -61.646097 163.0882369 卫建华 电话采编 15292600492 001658 1045093445
55.14 DataX 10 1989-05-26 00:00:01 test 辽宁省兰州市徐汇区东山街176号 廊坊银行 创汇科技有限公司 6227605280751588 6270262330691012025 341822200908168063 77.2165746 139.5431377 池浩 多媒体设计 18693948216 201678 0692522928
2021-08-13 17:02:04.020 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-13 17:02:04.021 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-08-13 17:02:04.022 [ job-0] INFO JobContainer - Addax Reader.Job [datareader] do post work.
2021-08-13 17:02:04.025 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-13 17:02:04.028 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 1817 bytes | Speed 605B/s, 3 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-13 17:02:04.030 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-13 17:02:00
任务结束时刻 : 2021-08-13 17:02:04
任务总计耗时 : 3s
任务平均流量 : 605B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0
配置说明#
column
的配置和其他插件的配置稍有不同,一个字段由以下配置项组成
配置项 | 是否必须 | 默认值 | 示例 | 说明 |
---|---|---|---|---|
value | 否 | 无 | Addax |
数据值,在某些情况下为必选项 |
rule | 否 | constant |
idCard |
数据生产规则,详细下面的描述 |
type | 否 | string |
double |
数据值类型 |
dateFormat | 否 | yyyy-MM-dd HH:mm:ss |
yyyy/MM/dd HH:mm:ss |
日期格式,仅在type 为 date 时有效 |
rule 说明#
该插件的字段配置核心是 rule
字段,它用来指示应该生成什么样的数据,并依据不同规则,配合其他配置选项来生产满足期望的数据。 当前 rule
的配置均为内置支持的规则,暂不支持自定义,以下详细说明
constant#
constant
是 rule
的默认配置,该规则意味着要生成的数据值由 value
配置项决定,其不做任何变更。比如
{
"value": "Addax",
"type": "string",
"rule": "constant"
}
表示该字段生产的数据值均为 Addax
incr#
incr
配置项的含义和 streamreader
插件中的 incr
含义一致,表示这是一个递增的数据生产规则,比如
{
"value": "1,2",
"rule": "incr",
"type": "long"
}
表示该字段的数据是一个长整形,数值从 1 开始,每次递增 2,也就是形成 1 开始,步长为 2 的递增数列。
该字段更详细的配置规则和注意事项,可以参考 streamreader 中的 incr
说明。
random#
random
配置项的含义和 streamreader
插件中的 random
含义一致,表示这是一个递增的数据生产规则,比如
{
"value": "1,10",
"rule": "random",
"type": "string"
}
表示该字段的数据是一个长度为 1 到 10 (1和10都包括)随机字符串。
该字段更详细的配置规则和注意事项,可以参考 streamreader 中的 random
说明。
规则名称 | 含义 | 示例 | 数据类型 | 说明 |
---|---|---|---|---|
address |
随机生成一条基本满足国内实际情况的地址信息 | 辽宁省兰州市徐汇区东山街176号 |
string | |
bank |
随机生成一个国内银行名称 | 华夏银行 |
string | |
company |
随机生成一个公司的名称 | 万迅电脑科技有限公司 |
string | |
creditCard |
随机生成一个信用卡卡号 | 430405198908214042 |
string | 16位 |
debitCard |
随机生成一个储蓄卡卡号 | 6227894836568607 |
string | 19位 |
idCard |
随机生成一个国内身份证号码 | 350600198508222018 |
string | 18位,负责校验规则,头6位编码满足行政区划要求 |
lat |
随机生成维度数据 | 48.6648764 |
double | 固定7位小数 ,也可以用latitude 表示 |
lng |
随机生成经度数据 | 120.6018163 |
double | 固定7位小数,也可以使用longitude 表示 |
name |
随机生成一个国内名字 | 池浩 |
string | 暂没考虑姓氏在国内的占比度 |
job |
随机生成一个国内岗位名称 | 系统工程师 |
string | 数据来源于招聘网站 |
phone |
随机生成一个国内手机号码 | 15292600492 |
string | 暂不考虑虚拟手机号 |
stockCode |
随机生成一个6位的股票代码 | 687461 |
string | 前两位满足国内股票代码编号规范 |
stockAccount |
随机生成一个10位的股票交易账户 | 0692522928 |
string | 完全随机,不满足账户规范 |
注意:上述表格中的规则返回的数据类型是固定的,且不支持修改,因此 type
无需配置,配置的类型也会被忽略,因为数据生成来自内部规则,所以 value
也无需配置,配置的内容也会被忽略。
Dbf Reader#
DbfReader
插件支持读取DBF格式文件
配置说明#
以下是读取 DBF 文件后打印到终端的配置样例
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "dbfreader",
"parameter": {
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "boolean"
},
{
"index": 4,
"type": "string"
},
{
"value": "dbf",
"type": "string"
}
],
"path": [
"/tmp/out"
],
"encoding": "GBK"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": "true"
}
}
}
]
}
}
参数说明#
parameter
配置项支持以下配置
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
path | 是 | 无 | DBF文件路径,支持写多个路径,详细情况见下 |
column | 是 | 无 | 所配置的表中需要同步的列集合, 是 {type: value} 或 {type: index} 的集合,详细配置见下 |
encoding | 否 | GBK | DBF文件编码,比如 GBK , UTF-8 |
nullFormat | 否 | \N |
定义哪个字符串可以表示为null, |
path#
描述:本地文件系统的路径信息,注意这里可以支持填写多个路径。
当指定单个本地文件,DbfFileReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取。
当指定多个本地文件,DbfFileReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。
当指定通配符,DbfFileReader尝试遍历出多个文件信息。例如: 指定
/*
代表读取/目录下所有的文件,指定/foo/*
代表读取foo
目录下游所有的文件。 dbfFileReader目前只支持*
作为文件通配符。
特别需要注意的是,Addax会将一个作业下同步的所有dbf File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件用户必须保证为类dbf格式,并且提供给Addax权限可读。
特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。
column#
读取字段列表,type
指定源数据的类型,name
为字段名,长度最大8,value
指定当前类型为常量,不从源头文件读取数据,而是根据 value
值自动生成对应的列。
默认情况下,用户可以全部按照 String
类型读取数据,配置如下:
{
"column": [
"*"
]
}
用户可以指定Column字段信息,配置如下:
[
{
"type": "long",
"index": 0
},
{
"type": "string",
"value": "addax"
}
]
"index": 0
表示从本地DBF文件第一列获取int字段
"value": "addax"
表示从 dbfFileReader 内部生成 addax
的字符串字段作为当前字段 对于用户指定 column
信息,type
必须填写,index
和 value
必须选择其一。
支持的数据类型#
本地文件本身提供数据类型,该类型是 Addax dbfFileReader定义:
Addax 内部类型 | 本地文件 数据类型 |
---|---|
Long | Long |
Double | Double |
String | String |
Boolean | Boolean |
Date | Date |
其中:
Long 是指本地文件文本中使用整形的字符串表示形式,例如
19901219
。Double 是指本地文件文本中使用Double的字符串表示形式,例如
3.1415
。Boolean 是指本地文件文本中使用Boolean的字符串表示形式,例如
true
、false
。不区分大小写。Date 是指本地文件文本中使用Date的字符串表示形式,例如
2014-12-31
,可以配置dateFormat
指定格式。
ElasticSearchReader#
ElasticSearchReader 插件实现了从 Elasticsearch 读取索引的功能, 它通过 Elasticsearch 提供的 Rest API (默认端口9200),执行指定的查询语句批量获取数据
示例#
假定要获取的索引内容如下
{
"took": 14,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "test-1",
"_type": "default",
"_id": "38",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
},
{
"_index": "test-1",
"_type": "default",
"_id": "103",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
}
]
}
}
配置一个从 Elasticsearch 读取数据并打印到终端的任务
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "elasticsearchreader",
"parameter": {
"endpoint": "http://127.0.0.1:9200",
"accessId": "",
"accesskey": "",
"index": "test-1",
"type": "default",
"searchType": "dfs_query_then_fetch",
"headers": {},
"scroll": "3m",
"search": [
{
"query": {
"match": {
"col_ip": "1.1.1.1"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "col_date",
"size": 10
}
}
}
}
],
"column": [
"col_ip",
"col_double",
"col_long",
"col_integer",
"col_keyword",
"col_text",
"col_geo_point",
"col_date"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
将上述内容保存为 job/es2stream.json
执行下面的命令进行采集
bin/addax.sh job/es2stream.json
其输出结果类似如下(输出记录数有删减)
2021-02-19 13:38:15.860 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"accessId":"",
"headers":{},
"endpoint":"http://127.0.0.1:9200",
"search":[
{
"query": {
"match": {
"col_ip": "1.1.1.1"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "col_date",
"size": 10
}
}
}
}
],
"accesskey":"*****",
"searchType":"dfs_query_then_fetch",
"scroll":"3m",
"column":[
"col_ip",
"col_double",
"col_long",
"col_integer",
"col_keyword",
"col_text",
"col_geo_point",
"col_date"
],
"index":"test-1",
"type":"default"
},
"name":"elasticsearchreader"
},
"writer":{
"parameter":{
"print":true,
"encoding":"UTF-8"
},
"name":"streamwriter"
}
}
],
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"byte":-1,
"channel":1
}
}
}
2021-02-19 13:38:15.934 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO JobContainer - Addax jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO JobContainer - Set jobId = 0
2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34
2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34
2021-02-19 13:38:19.845 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO JobContainer - Addax Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO JobContainer -
任务启动时刻 : 2021-02-19 13:38:15
任务结束时刻 : 2021-02-19 13:38:19
任务总计耗时 : 3s
任务平均流量 : 2.84KB/s
记录写入速度 : 31rec/s
读出记录总数 : 2
读写失败总数 : 0
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
endpoint | 是 | string | 无 | ElasticSearch的连接地址 |
accessId | 否 | string | "" |
http auth中的user |
accessKey | 否 | string | "" |
http auth中的password |
index | 是 | string | 无 | elasticsearch中的index名 |
type | 否 | string | index名 | elasticsearch中index的type名 |
search | 是 | list | [] |
json格式api搜索数据体 |
column | 是 | list | 无 | 需要读取的字段 |
timeout | 否 | int | 60 | 客户端超时时间(单位:秒) |
discovery | 否 | boolean | false | 启用节点发现将(轮询)并定期更新客户机中的服务器列表 |
compression | 否 | boolean | true | http请求,开启压缩 |
multiThread | 否 | boolean | true | http请求,是否有多线程 |
searchType | 否 | string | dfs_query_then_fetch |
搜索类型 |
headers | 否 | map | {} |
http请求头 |
scroll | 否 | string | "" |
滚动分页配置 |
search#
search 配置项允许配置为满足 Elasticsearch API 查询要求的内容,比如这样:
{
"query": {
"match": {
"message": "myProduct"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "state",
"size": 10
}
}
}
}
Ftp Reader#
FtpReader 提供了读取远程 FTP/SFTP 文件系统数据存储的能力。
功能说明#
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "ftpreader",
"parameter": {
"protocol": "sftp",
"host": "127.0.0.1",
"port": 22,
"username": "xx",
"password": "xxx",
"path": [
"/var/pub/ftpReaderTest/data"
],
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "date",
"format": "yyyy.MM.dd"
}
],
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "ftpWriter",
"parameter": {
"path": "/var/ftp/FtpWriter/result",
"fileName": "shihf",
"writeMode": "truncate",
"format": "yyyy-MM-dd"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
protocol | 是 | 无 | ftp服务器协议,目前支持传输协议有ftp和sftp |
host | 是 | 无 | ftp服务器地址 |
port | 否 | 22/21 | 若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21 |
timeout | 否 | 60000 | 连接ftp服务器连接超时时间,单位毫秒(ms) |
connectPattern | 否 | PASV | 连接模式,仅支持 PORT , PASV 模式。该参数只在传输协议是标准ftp协议时使用 |
username | 是 | 无 | ftp服务器访问用户名 |
password | 是 | 无 | ftp服务器访问密码 |
path | 是 | 无 | 远程FTP文件系统的路径信息,注意这里可以支持填写多个路径,详细描述见下 |
column | 是 | 默认String类型 | 读取字段列表,type指定源数据的类型,详见下文 |
fieldDelimiter | 是 | , |
描述:读取的字段分隔符 |
compress | 否 | 无 | 文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、gzip、bzip2 |
encoding | 否 | utf-8 | 读取文件的编码配置 |
skipHeader | 否 | false | 类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过 |
nullFormat | 否 | \N |
定义哪些字符串可以表示为null |
maxTraversalLevel | 否 | 100 | 允许遍历文件夹的最大层数 |
csvReaderConfig | 否 | 无 | 读取CSV类型文件参数配置,Map类型。不配置则使用默认值,详见下文 |
path#
远程FTP文件系统的路径信息,注意这里可以支持填写多个路径。
当指定单个远程FTP文件,FtpReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取 = 当指定多个远程FTP文件,FtpReader支持使用多线程进行数据抽取。线程并发数通过通道数指定
当指定通配符,FtpReader尝试遍历出多个文件信息。例如: 指定
/*
代表读取/目录下所有的文件,指定/bazhen/*
代表读取 bazhen 目录下游所有的文件。目前只支持*
作为文件通配符。
特别需要注意的是,Addax会将一个作业下同步的所有Text File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件用户必须保证为类CSV格式,并且提供给Addax权限可读。 特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。
column#
读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。
默认情况下,用户可以全部按照String类型读取数据,配置如下:
{
"column": [
"*"
]
}
用户可以指定Column字段信息,配置如下:
[
{
"type": "long",
"index": 0,
"description": "从远程FTP文件文本第一列获取int字段"
},
{
"type": "string",
"value": "addax",
"description": "从FtpReader内部生成alibaba的字符串字段作为当前字段"
}
]
对于用户指定Column信息,type必须填写,index/value必须选择其一。
csvReaderConfig#
常见配置:
{
"csvReaderConfig": {
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}
}
所有配置项及默认值,配置时 csvReaderConfig 的map中请严格按照以下字段名字进行配置:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
类型转换#
远程FTP文件本身不提供数据类型,该类型是Addax FtpReader定义:
Addax 内部类型 | 远程FTP文件 数据类型 |
---|---|
Long | Long |
Double | Double |
String | String |
Boolean | Boolean |
Date | Date |
其中:
Long 是指远程FTP文件文本中使用整形的字符串表示形式,例如"19901219"。
Double 是指远程FTP文件文本中使用Double的字符串表示形式,例如"3.1415"。
Boolean 是指远程FTP文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。
Date 是指远程FTP文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。
限制#
单个File支持多线程并发读取,这里涉及到单个File内部切分算法
单个File在压缩情况下,从技术上无法支持多线程并发读取。
Hbase11X Reader#
Hbase11X Reader 插件支持从 HBase 1.x 版本读取数据, 其实现方式为 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 Scan 方式读取你指定 rowkey
范围内的数据。
配置#
建表以及填充数据#
以下演示基于下面创建的表以及数据
create 'users', 'address','info'
put 'users', 'lisi', 'address:country', 'china'
put 'users', 'lisi', 'address:province', 'beijing'
put 'users', 'lisi', 'info:age', 27
put 'users', 'lisi', 'info:birthday', '1987-06-17'
put 'users', 'lisi', 'info:company', 'baidu'
put 'users', 'xiaoming', 'address:city', 'hangzhou'
put 'users', 'xiaoming', 'address:country', 'china'
put 'users', 'xiaoming', 'address:province', 'zhejiang'
put 'users', 'xiaoming', 'info:age', 29
put 'users', 'xiaoming', 'info:birthday', '1987-06-17'
put 'users', 'xiaoming', 'info:company', 'alibaba'
normal 模式#
把HBase中的表,当成普通二维表(横表)进行读取,读取最新版本数据。如:
hbase(main):017:0> scan 'users'
ROW COLUMN+CELL
lisi column=address:city, timestamp=1457101972764, value=beijing
lisi column=address:country, timestamp=1457102773908, value=china
lisi column=address:province, timestamp=1457101972736, value=beijing
lisi column=info:age, timestamp=1457101972548, value=27
lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17
lisi column=info:company, timestamp=1457101972653, value=baidu
xiaoming column=address:city, timestamp=1457082196082, value=hangzhou
xiaoming column=address:country, timestamp=1457082195729, value=china
xiaoming column=address:province, timestamp=1457082195773, value=zhejiang
xiaoming column=info:age, timestamp=1457082218735, value=29
xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17
xiaoming column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0580 seconds
读取后数据
rowKey | addres:city | address:country | address:province | info:age | info:birthday | info:company |
---|---|---|---|---|---|---|
lisi | beijing | china | beijing | 27 | 1987-06-17 | baidu |
xiaoming | hangzhou | china | zhejiang | 29 | 1987-06-17 | alibaba |
multiVersionFixedColumn 模式#
把HBase中的表,当成竖表进行读取。读出的每条记录一定是四列形式,依次为:rowKey
,family:qualifier
,timestamp
,value
。
读取时需要明确指定要读取的列,把每一个 cell 中的值,作为一条记录(record),若有多个版本就有多条记录(record)。如:
hbase(main):018:0> scan 'users',{VERSIONS=>5}
ROW COLUMN+CELL
lisi column=address:city, timestamp=1457101972764, value=beijing
lisi column=address:contry, timestamp=1457102773908, value=china
lisi column=address:province, timestamp=1457101972736, value=beijing
lisi column=info:age, timestamp=1457101972548, value=27
lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17
lisi column=info:company, timestamp=1457101972653, value=baidu
xiaoming column=address:city, timestamp=1457082196082, value=hangzhou
xiaoming column=address:contry, timestamp=1457082195729, value=china
xiaoming column=address:province, timestamp=1457082195773, value=zhejiang
xiaoming column=info:age, timestamp=1457082218735, value=29
xiaoming column=info:age, timestamp=1457082178630, value=24
xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17
xiaoming column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0260 seconds
读取后数据(4列)
rowKey | column:qualifier | timestamp | value |
---|---|---|---|
lisi | address:city | 1457101972764 | beijing |
lisi | address:contry | 1457102773908 | china |
lisi | address:province | 1457101972736 | beijing |
lisi | info:age | 1457101972548 | 27 |
lisi | info:birthday | 1457101972604 | 1987-06-17 |
lisi | info:company | 1457101972653 | beijing |
xiaoming | address:city | 1457082196082 | hangzhou |
xiaoming | address:contry | 1457082195729 | china |
xiaoming | address:province | 1457082195773 | zhejiang |
xiaoming | info:age | 1457082218735 | 29 |
xiaoming | info:age | 1457082178630 | 24 |
xiaoming | info:birthday | 1457082186830 | 1987-06-17 |
xiaoming | info:company | 1457082189826 | alibaba |
配置样例#
配置一个从 HBase 抽取数据到本地的作业:(normal 模式)
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "xxxf"
},
"table": "users",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info: age",
"type": "string"
},
{
"name": "info: birthday",
"type": "date",
"format": "yyyy-MM-dd"
},
{
"name": "info: company",
"type": "string"
},
{
"name": "address: country",
"type": "string"
},
{
"name": "address: province",
"type": "string"
},
{
"name": "address: city",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": true
}
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
"fileName": "qiran",
"writeMode": "truncate"
}
}
}
]
}
}
配置一个从 HBase 抽取数据到本地的作业:( multiVersionFixedColumn 模式)
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "127.0.0.1:2181"
},
"table": "users",
"encoding": "utf-8",
"mode": "multiVersionFixedColumn",
"maxVersion": "-1",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info: age",
"type": "string"
},
{
"name": "info: birthday",
"type": "date",
"format": "yyyy-MM-dd"
},
{
"name": "info: company",
"type": "string"
},
{
"name": "address: contry",
"type": "string"
},
{
"name": "address: province",
"type": "string"
},
{
"name": "address: city",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": ""
}
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
"fileName": "qiran",
"writeMode": "truncate"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
hbaseConfig | 是 | 无 | 连接HBase集群需要的配置信息, hbase.zookeeper.quorum 为必填项,其他 HBase client的配置为可选项 |
mode | 是 | 无 | 读取hbase的模式,可填写 normal 或 multiVersionFixedColumn |
table | 是 | 无 | 要读取的 hbase 表名(大小写敏感) |
encoding | 否 | UTF-8 | 编码方式,UTF-8 或是 GBK ,用于对二进制存储的 HBase byte[] 转为 String 时的编码 |
column | 是 | 无 | 要读取的hbase字段,normal 模式与 multiVersionFixedColumn 模式下必填项, 详细说明见下文 |
maxVersion | 是 | 无 | 指定在多版本模式下读取的版本数,-1 表示读取所有版本, multiVersionFixedColumn 模式下必填 |
range | 否 | 无 | 指定读取的rowkey 范围, 详见下文 |
scanCacheSize | 否 | 256 | Hbase client 每次从服务器端读取的行数 |
scanBatchSize | 否 | 100 | Hbase client 每次从服务器端读取的列数 |
column#
描述:要读取的hbase字段,normal 模式与multiVersionFixedColumn 模式下必填项。
normal 模式#
name
指定读取的 hbase 列,除了 rowkey
外,必须为 列族:列名
的格式,type
指定源数据的类型,format
指定日期类型的格式,
value
指定当前类型为常量,不从 hbase 读取数据,而是根据 value
值自动生成对应的列。配置格式如下:
{
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"value": "test",
"type": "string"
}
]
}
normal 模式下,对于用户指定Column信息,type必须填写,name/value必须选择其一。
multiVersionFixedColumn 模式#
name
指定读取的 hbase 列,除了 rowkey
外,必须为 列族:列名
的格式,type
指定源数据的类型,format
指定日期类型的格式 。
multiVersionFixedColumn 模式下不支持常量列。配置格式如下:
{
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info: age",
"type": "string"
}
]
}
range#
指定读取的 rowkey
范围
startRowkey
:指定开始rowkey
endRowkey
指定结束rowkey
isBinaryRowkey
:指定配置的startRowkey
和endRowkey
转换为byte[]
时的方式,默认值为false,若为true,则调用Bytes.toBytesBinary(rowkey)
方法进行转换;若为false:则调用Bytes.toBytes(rowkey)
配置格式如下:
{
"range": {
"startRowkey": "aaa",
"endRowkey": "ccc",
"isBinaryRowkey": false
}
}
类型转换#
下面列出支持的读取HBase数据类型,HbaseReader 针对 HBase 类型转换列表:
Addax 内部类型 | HBase 数据类型 |
---|---|
Long | int, short ,long |
Double | float, double |
String | string, binarystring |
Date | date |
Boolean | boolean |
请注意:
除上述罗列字段类型外,其他类型均不支持
限制#
目前不支持动态列的读取。考虑网络传输流量(支持动态列,需要先将hbase所有列的数据读取出来,再按规则进行过滤),现支持的两种读取模式中需要用户明确指定要读取的列。
关于同步作业的切分:目前的切分方式是根据用户hbase表数据的region分布进行切分。即:在用户填写的
[startrowkey,endrowkey]
范围内,一个region会切分成一个task,单个region不进行切分。multiVersionFixedColumn模式下不支持增加常量列
Hbase20XReader#
Hbase20X Reader 插件支持从 HBase 2.x 版本读取数据, 其实现方式为 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 Scan 方式读取你指定 rowkey
范围内的数据。
配置#
以下演示基于下面创建的表以及数据
create 'users', {NAME=>'address', VERSIONS=>100},{NAME=>'info',VERSIONS=>1000}
put 'users', 'lisi', 'address:country', 'china1', 20200101
put 'users', 'lisi', 'address:province', 'beijing1', 20200101
put 'users', 'lisi', 'info:age', 27, 20200101
put 'users', 'lisi', 'info:birthday', '1987-06-17', 20200101
put 'users', 'lisi', 'info:company', 'baidu1', 20200101
put 'users', 'xiaoming', 'address:city', 'hangzhou1', 20200101
put 'users', 'xiaoming', 'address:country', 'china1', 20200101
put 'users', 'xiaoming', 'address:province', 'zhejiang1',20200101
put 'users', 'xiaoming', 'info:age', 29, 20200101
put 'users', 'xiaoming', 'info:birthday', '1987-06-17',20200101
put 'users', 'xiaoming', 'info:company', 'alibaba1', 20200101
put 'users', 'lisi', 'address:country', 'china2', 20200102
put 'users', 'lisi', 'address:province', 'beijing2', 20200102
put 'users', 'lisi', 'info:age', 27, 20200102
put 'users', 'lisi', 'info:birthday', '1987-06-17', 20200102
put 'users', 'lisi', 'info:company', 'baidu2', 20200102
put 'users', 'xiaoming', 'address:city', 'hangzhou2', 20200102
put 'users', 'xiaoming', 'address:country', 'china2', 20200102
put 'users', 'xiaoming', 'address:province', 'zhejiang2', 20200102
put 'users', 'xiaoming', 'info:age', 29, 20200102
put 'users', 'xiaoming', 'info:birthday', '1987-06-17', 20200102
put 'users', 'xiaoming', 'info:company', 'alibaba2', 20200102
normal 模式#
把HBase中的表,当成普通二维表(横表)进行读取,读取最新版本数据。如:
hbase(main):017:0> scan 'users'
ROW COLUMN+CELL
lisi column=address:city, timestamp=1457101972764, value=beijing
lisi column=address:country, timestamp=1457102773908, value=china
lisi column=address:province, timestamp=1457101972736, value=beijing
lisi column=info:age, timestamp=1457101972548, value=27
lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17
lisi column=info:company, timestamp=1457101972653, value=baidu
xiaoming column=address:city, timestamp=1457082196082, value=hangzhou
xiaoming column=address:country, timestamp=1457082195729, value=china
xiaoming column=address:province, timestamp=1457082195773, value=zhejiang
xiaoming column=info:age, timestamp=1457082218735, value=29
xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17
xiaoming column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0580 seconds
读取后数据
rowKey | addres:city | address:country | address:province | info:age | info:birthday | info:company |
---|---|---|---|---|---|---|
lisi | beijing | china | beijing | 27 | 1987-06-17 | baidu |
xiaoming | hangzhou | china | zhejiang | 29 | 1987-06-17 | alibaba |
multiVersionFixedColumn 模式#
把HBase中的表,当成竖表进行读取。读出的每条记录一定是四列形式,依次为:rowKey
,family:qualifier
,timestamp
,value
。
读取时需要明确指定要读取的列,把每一个 cell 中的值,作为一条记录(record),若有多个版本就有多条记录(record)。如:
hbase(main):018:0> scan 'users',{VERSIONS=>5}
ROW COLUMN+CELL
lisi column=address:city, timestamp=1457101972764, value=beijing
lisi column=address:contry, timestamp=1457102773908, value=china
lisi column=address:province, timestamp=1457101972736, value=beijing
lisi column=info:age, timestamp=1457101972548, value=27
lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17
lisi column=info:company, timestamp=1457101972653, value=baidu
xiaoming column=address:city, timestamp=1457082196082, value=hangzhou
xiaoming column=address:contry, timestamp=1457082195729, value=china
xiaoming column=address:province, timestamp=1457082195773, value=zhejiang
xiaoming column=info:age, timestamp=1457082218735, value=29
xiaoming column=info:age, timestamp=1457082178630, value=24
xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17
xiaoming column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0260 seconds
读取后数据(4列)
rowKey | column:qualifier | timestamp | value |
---|---|---|---|
lisi | address:city | 1457101972764 | beijing |
lisi | address:contry | 1457102773908 | china |
lisi | address:province | 1457101972736 | beijing |
lisi | info:age | 1457101972548 | 27 |
lisi | info:birthday | 1457101972604 | 1987-06-17 |
lisi | info:company | 1457101972653 | beijing |
xiaoming | address:city | 1457082196082 | hangzhou |
xiaoming | address:contry | 1457082195729 | china |
xiaoming | address:province | 1457082195773 | zhejiang |
xiaoming | info:age | 1457082218735 | 29 |
xiaoming | info:age | 1457082178630 | 24 |
xiaoming | info:birthday | 1457082186830 | 1987-06-17 |
xiaoming | info:company | 1457082189826 | alibaba |
配置一个从 HBase 抽取数据到本地的作业:(normal 模式)
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "xxxf"
},
"table": "users",
"encoding": "utf-8",
"mode": "normal",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info: age",
"type": "string"
},
{
"name": "info: birthday",
"type": "date",
"format": "yyyy-MM-dd"
},
{
"name": "info: company",
"type": "string"
},
{
"name": "address: country",
"type": "string"
},
{
"name": "address: province",
"type": "string"
},
{
"name": "address: city",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": "",
"isBinaryRowkey": true
}
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
"fileName": "qiran",
"writeMode": "truncate"
}
}
}
]
}
}
配置一个从 HBase 抽取数据到本地的作业:( multiVersionFixedColumn 模式)
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "xxx"
},
"table": "users",
"encoding": "utf-8",
"mode": "multiVersionFixedColumn",
"maxVersion": "-1",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info: age",
"type": "string"
},
{
"name": "info: birthday",
"type": "date",
"format": "yyyy-MM-dd"
},
{
"name": "info: company",
"type": "string"
},
{
"name": "address: contry",
"type": "string"
},
{
"name": "address: province",
"type": "string"
},
{
"name": "address: city",
"type": "string"
}
],
"range": {
"startRowkey": "",
"endRowkey": ""
}
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
"fileName": "qiran",
"writeMode": "truncate"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
hbaseConfig | 是 | 无 | 连接HBase集群需要的配置信息, hbase.zookeeper.quorum 为必填项,其他 HBase client的配置为可选项 |
mode | 是 | 无 | 读取hbase的模式,可填写 normal 或 multiVersionFixedColumn |
table | 是 | 无 | 要读取的 hbase 表名(大小写敏感) |
encoding | 否 | UTF-8 | 编码方式,UTF-8 或是 GBK ,用于对二进制存储的 HBase byte[] 转为 String 时的编码 |
column | 是 | 无 | 要读取的字段,normal 模式与 multiVersionFixedColumn 模式下必填项, 详细说明见下文 |
maxVersion | 是 | 无 | 指定在多版本模式下读取的版本数,-1 表示读取所有版本, multiVersionFixedColumn 模式下必填 |
range | 否 | 无 | 指定读取的 rowkey 范围, 详见下文 |
scanCacheSize | 否 | 256 | 每次从服务器端读取的行数 |
scanBatchSize | 否 | 100 | 每次从服务器端读取的列数 |
column#
描述:要读取的字段,normal
模式与 multiVersionFixedColumn
模式下必填项。
normal 模式#
name
指定读取的 hbase 列,除了 rowkey
外,必须为 列族:列名
的格式,type
指定源数据的类型,format
指定日期类型的格式,
value
指定当前类型为常量,不从 hbase 读取数据,而是根据 value
值自动生成对应的列。配置格式如下:
{
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"value": "test",
"type": "string"
}
]
}
normal 模式下,对于用户指定Column信息,type必须填写,name/value必须选择其一。
multiVersionFixedColumn 模式#
name
指定读取的 hbase 列,除了 rowkey
外,必须为 列族:列名
的格式,type
指定源数据的类型,format
指定日期类型的格式 。
multiVersionFixedColumn
模式下不支持常量列。
配置格式如下:
{
"mode": "multiVersionFixedColumn",
"maxVersion": 3,
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "info: age",
"type": "string"
}
]
}
range#
指定读取的 rowkey
范围
startRowkey
:指定开始rowkey
endRowkey
: 指定结束rowkey
isBinaryRowkey
:指定配置的startRowkey
和endRowkey
转换为byte[]
时的方式,默认值为false,若为true,则调用Bytes.toBytesBinary(rowkey)
方法进行转换;若为false:则调用Bytes.toBytes(rowkey)
配置格式如下:
{
"range": {
"startRowkey": "aaa",
"endRowkey": "ccc",
"isBinaryRowkey": false
}
}
类型转换#
下面列出支持的读取HBase数据类型:
Addax 内部类型 | HBase 数据类型 |
---|---|
Long | int, short ,long |
Double | float, double |
String | string, binarystring |
Date | date |
Boolean | boolean |
请注意:
除上述罗列字段类型外,其他类型均不支持
限制#
目前不支持动态列的读取。考虑网络传输流量(支持动态列,需要先将hbase所有列的数据读取出来,再按规则进行过滤),现支持的两种读取模式中需要用户明确指定要读取的列。
关于同步作业的切分:目前的切分方式是根据用户hbase表数据的region分布进行切分。即:在用户填写的
[startrowkey,endrowkey]
范围内,一个region会切分成一个task,单个region不进行切分。multiVersionFixedColumn
模式下不支持增加常量列
hbase11xsql reader#
hbase11xsqlreader 插件实现了从Phoenix(HBase SQL)读取数据, 支持的 HBase 版本为 1.x
配置样例#
配置一个从Phoenix同步抽取数据到本地的作业:
{
"job": {
"setting": {
"speed": {
"byte":-1,
"channel": 1
}
},
"content": [ {
"reader": {
"name": "hbase11xsqlreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "node1,node2,node3"
},
"table": "US_POPULATION",
"column": [],
"where": "1=1",
"querySql": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true,
"encoding": "UTF-8"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
hbaseConfig | 是 | 无 | 需要通过 Phoenix 客户端去连接 hbase 集群,因此这里需要填写对应 hbase 集群的 zkurl 地址 |
table | 是 | 无 | 指定 Phoenix 中的表名,如果有 namespace,该值设置为 namespace.tablename |
querySql | 否 | 无 | 不是直接查询表,而是提供具体的查询语句,如果该参数和 table 参数同时存在,则优先使用该参数 |
column | 是 | 无 | 填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值或 "*" 表示读取所有列 |
where | 否 | 无 | where 条件 |
类型转换#
目前支持大部分 Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出类型转换列表:
Addax 内部类型 | Phoenix 数据类型 |
---|---|
String | CHAR, VARCHAR |
Bytes | BINARY, VARBINARY |
Bool | BOOLEAN |
Long | INTEGER, TINYINT, SMALLINT, BIGINT |
Double | FLOAT, DECIMAL, DOUBLE, |
Date | DATE, TIME, TIMESTAMP |
hbase20xsql Reader#
hbase20xsqlreade r插件实现了从Phoenix(HBase SQL)读取数据,对应版本为 HBase2.X 和 Phoenix5.X。
配置样例#
配置一个从Phoenix同步抽取数据到本地的作业:
{
"job": {
"content": [
{
"reader": {
"name": "hbase20xsqlreader",
"parameter": {
"queryServerAddress": "http://127.0.0.1:8765",
"serialization": "PROTOBUF",
"table": "TEST",
"column": [
"ID",
"NAME"
],
"splitKey": "ID"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 3,
"bytes": -1
}
}
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
queryServerAddress | 是 | 无 | Phoenix QueryServer 地址, 该插件通过 PQS 进行连接 |
serialization | 否 | PROTOBUF | QueryServer使用的序列化协议 |
table | 是 | 无 | 所要读取表名 |
schema | 否 | 无 | 表所在的schema |
column | 否 | 全部列 | 填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列 |
splitKey | 是 | 无 | 根据数据特征动态指定切分点,对表数据按照指定的列的最大、最小值进行切分,仅支持整型和字符串类型 |
splitPoints | 否 | 无 | 按照表的split进行切分 |
where | 否 | 无 | 支持对表查询增加过滤条件,每个切分都会携带该过滤条件 |
querySql | 否 | 无 | 支持指定多个查询语句,但查询列类型和数目必须保持一致 |
类型转换#
目前支持大部分 Phoenix 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
Addax 内部类型 | Phoenix 数据类型 |
---|---|
String | CHAR, VARCHAR |
Bytes | BINARY, VARBINARY |
Bool | BOOLEAN |
Long | INTEGER, TINYINT, SMALLINT, BIGINT |
Double | FLOAT, DECIMAL, DOUBLE, |
Date | DATE, TIME, TIMESTAMP |
约束限制#
切分表时切分列仅支持单个列,且该列必须是表主键
不设置
splitPoint
默认使用自动切分,此时切分列仅支持整形和字符型表名和
SCHEMA
名及列名大小写敏感,请与 Phoenix 表实际大小写保持一致仅支持通过 Phoenix QueryServer 读取数据,因此您的 Phoenix 必须启动 QueryServer 服务才能使用本插件
HDFS Reader#
HdfsReade r提供了读取分布式文件系统数据存储的能力。
目前HdfsReader支持的文件格式如下:
textfile(text)
orcfile(orc)
rcfile(rc)
sequence file(seq)
Csv(csv)
parquet
功能与限制#
支持textfile、orcfile、parquet、rcfile、sequence file和csv格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量
支持递归读取、支持正则表达式(
*
和?
)。支持常见的压缩算法,包括 GZIP, SNAPPY, ZLIB等。
多个File可以支持并发读取。
支持sequence file数据压缩,目前支持lzo压缩方式。
csv类型支持压缩格式有:gzip、bz2、zip、lzo、lzo_deflate、snappy。
目前插件中Hive版本为
3.1.1
,Hadoop版本为3.1.1
, 在Hadoop2.7.x
, Hadoop3.1.x
和Hive2.x
, hive3.1.x
测试环境中写入正常;其它版本理论上都支持,但在生产环境使用前,请进一步测试;支持
kerberos
认证
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 3,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/mytable01/*",
"defaultFS": "hdfs://xxx:port",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"type": "string",
"value": "hello"
},
{
"index": 2,
"type": "double"
}
],
"fileType": "orc",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
配置项说明#
配置项 | 是否必须 | 默认值 | 说明 |
---|---|---|---|
path | 是 | 无 | 要读取的文件路径 |
defaultFS | 是 | 无 | Hadoop hdfs 文件系统 NAMENODE 节点地址,如果配置了 HA 模式,则为 defaultFS 的值 |
fileType | 是 | 无 | 文件的类型 |
column | 是 | 无 | 读取字段列表 |
fieldDelimiter | 否 | , |
指定文本文件的字段分隔符,二进制文件不需要指定该项 |
encoding | 否 | utf-8 |
读取文件的编码配置, 目前仅支持 utf-8 |
nullFormat | 否 | 无 | 自定义哪些字符可以表示为空,例如如果用户配置: "\\N" ,那么如果源头数据是 "\N" ,视作 null 字段 |
haveKerberos | 否 | 无 | 是否启用 Kerberos 认证,如果启用,则需要同时配置 kerberosKeytabFilePath ,kerberosPrincipal |
kerberosKeytabFilePath | 否 | 无 | 用于 Kerberos 认证的凭证文件路径, 比如 /your/path/addax.service.keytab |
kerberosPrincipal | 否 | 无 | 用于 Kerberos 认证的凭证主体, 比如 addax/node1@WGZHAO.COM |
compress | 否 | 无 | 指定要读取的文件的压缩格式 |
hadoopConfig | 否 | 无 | 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置 |
path#
要读取的文件路径,如果要读取多个文件,可以使用正则表达式 *
,注意这里可以支持填写多个路径:
当指定单个Hdfs文件,HdfsReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取。
当指定多个Hdfs文件,HdfsReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。
当指定通配符,HdfsReader尝试遍历出多个文件信息。例如: 指定
/*
代表读取/
目录下所有的文件,指定/bazhen/*
代表读取 bazhen 目录下游所有的文件。HdfsReader目前只支持*
和?
作为文件通配符。
特别需要注意的是,Addax 会将一个作业下同步的所有的文件视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。并且提供给Addax权限可读。
fileType#
描述:文件的类型,目前只支持用户配置为
text 表示
textfile
文件格式orc 表示
orcfile
文件格式rc 表示
rcfile
文件格式seq 表示
sequence file
文件格式csv 表示普通 hdfs 文件格式(逻辑二维表)
parquet 表示
parquet
文件格式
特别需要注意的是,HdfsReader能够自动识别文件是 orcfile
、textfile
或者还是其它类型的文件,但该项是必填项,HdfsReader则会只读取用户配置的类型的文件,忽略路径下其他格式的文件
另外需要注意的是,由于 textfile
和 orcfile
是两种完全不同的文件格式,所以HdfsReader对这两种文件的解析方式也存在差异,这种差异导致hive支持的复杂复合类型(比如map,array,struct,union)在转换为支持的String类型时,转换的结果格式略有差异,比如以map类型为例:
orcfile
: map类型转换成 string 类型后,结果为{job=80, team=60, person=70}
textfile
: map类型转换成 string 类型后,结果为job:80,team:60,person:70
从上面的转换结果可以看出,数据本身没有变化,但是表示的格式略有差异,所以如果用户配置的文件路径中要同步的字段在Hive中是复合类型的话,建议配置统一的文件格式。
如果需要统一复合类型解析出来的格式,我们建议用户在hive客户端将 textfile
格式的表导成 orcfile
格式的表
column#
读取字段列表,type
指定源数据的类型,index
指定当前列来自于文本第几列(以0开始),value
指定当前类型为常量,不从源头文件读取数据,而是根据 value
值自动生成对应的列。
默认情况下,用户可以全部按照String类型读取数据,配置如下:
{
"column": [
"*"
]
}
用户可以指定Column字段信息,配置如下:
[
{
"type": "long",
"index": 0,
"description": "从本地文件文本第一列获取int字段"
},
{
"type": "string",
"value": "addax",
"description": "HdfsReader内部生成alibaba的字符串字段作为当前字段"
}
]
对于用户指定Column信息,type必须填写,index/value必须选择其一。
compress#
当fileType(文件类型)为csv下的文件压缩方式,目前仅支持 gzip、bz2、zip、lzo、lzo_deflate、hadoop-snappy、framing-snappy压缩; 值得注意的是,lzo存在两种压缩格式:lzo和lzo_deflate,用户在配置的时候需要留心,不要配错了;另外,由于snappy目前没有统一的stream format,addax目前只支持最主流的两种:hadoop-snappy(hadoop上的snappy stream format)和 framing-snappy(google建议的snappy stream format);
hadoopConfig#
hadoopConfig
里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置
{
"hadoopConfig": {
"dfs.nameservices": "cluster",
"dfs.ha.namenodes.cluster": "nn1,nn2",
"dfs.namenode.rpc-address.cluster.nn1": "node1.example.com:8020",
"dfs.namenode.rpc-address.cluster.nn2": "node2.example.com:8020",
"dfs.client.failover.proxy.provider.cluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}
这里的 cluster
表示 HDFS 配置成HA时的名字,也是 defaultFS
配置项中的名字 如果实际环境中的名字不是 cluster
,则上述配置中所有写有 cluster
都需要替换
csvReaderConfig#
读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。
常见配置:
{
"csvReaderConfig": {
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}
}
所有配置项及默认值,配置时 csvReaderConfig 的map中请 严格按照以下字段名字进行配置:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
类型转换#
Addax 内部类型 | Hive表 数据类型 |
---|---|
Long | TINYINT, SMALLINT, INT, BIGINT |
Double | FLOAT, DOUBLE |
String | String, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, BINARY |
Boolean | BOOLEAN |
Date | Date, TIMESTAMP |
Bytes | BINARY |
其中:
Long 是指Hdfs文件文本中使用整形的字符串表示形式,例如
123456789
Double 是指Hdfs文件文本中使用Double的字符串表示形式,例如
3.1415
Boolean 是指Hdfs文件文本中使用Boolean的字符串表示形式,例如
true
、false
。不区分大小写。Date 是指Hdfs文件文本中使用Date的字符串表示形式,例如
2014-12-31
Bytes 是指HDFS文件中使用二进制存储的内容,比如一张图片的数据
特别提醒:
Hive支持的数据类型 TIMESTAMP 可以精确到纳秒级别,所以
textfile
、orcfile
中TIMESTAMP
存放的数据类似于2015-08-21 22:40:47.397898389
, 如果转换的类型配置为Addax的Date,转换之后会导致纳秒部分丢失,所以如果需要保留纳秒部分的数据,请配置转换类型为String
类型。
FAQ#
Q: 如果报java.io.IOException: Maximum column length of 100,000 exceeded in column...异常信息,说明数据源column字段长度超过了100000字符。
A: 需要在json的reader里增加如下配置
{
"csvReaderConfig": {
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}
}
safetySwitch = false
表示单列长度不限制100000字符
Http Reader#
HttpReader 插件实现了读取 Restful API 数据的能力
示例#
示例接口与数据#
以下配置演示了如何从一个指定的 API 中获取数据,假定访问的接口为:
http://127.0.0.1:9090/mock/17/LDJSC/ASSET
走 GET 请求,请求的参数有
参数名称 | 参数值示例 |
---|---|
CURR_DATE | 2021-01-17 |
DEPT | 9400 |
USERNAME | andi |
以下是访问的数据样例,(实际返回数据略有不同)
{
"result": [
{
"CURR_DATE": "2019-12-09",
"DEPT": "9700",
"TOTAL_MANAGED_MARKET_VALUE": 1581.03,
"TOTAL_MANAGED_MARKET_VALUE_GROWTH": 36.75,
"TMMARKET_VALUE_DOD_GROWTH_RATE": -0.009448781026677719,
"TMMARKET_VALUE_GROWTH_MON": -0.015153586011995693,
"TMMARKET_VALUE_GROWTH_YEAR": 0.0652347643813081,
"TMMARKET_VALUE_SHARECOM": 0.024853621341525287,
"TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.005242133578517903,
"AVERAGE_NEW_ASSETS_DAYINMON": 1645.1193961136973,
"YEAR_NEW_ASSET_SSHARECOM": 0.16690149257388515,
"YN_ASSET_SSHARECOM_GROWTH_RATE": 0.017886267801303465,
"POTENTIAL_LOST_ASSETS": 56.76,
"TOTAL_LIABILITIES": 57.81,
"TOTAL_ASSETS": 1306.33,
"TOTAL_ASSETS_DOD_GROWTH": 4.79,
"TOTAL_ASSETS_DOD_GROWTH_RATE": -0.006797058194980485,
"NEW_ASSETS_DAY": 14.92,
"NEW_ASSETS_MON": 90.29,
"NEW_ASSETS_YEAR": 297.32,
"NEW_ASSETS_DOD_GROWTH_RATE": -0.04015576541561927,
"NEW_FUNDS_DAY": 18.16,
"INFLOW_FUNDS_DAY": 2.12,
"OUTFLOW_FUNDS_DAY": 9.73,
"OVERALL_POSITION": 0.810298404938773,
"OVERALL_POSITION_DOD_GROWTH_RATE": -0.03521615634095476,
"NEW_CUST_FUNDS_MON": 69.44,
"INFLOW_FUNDS_MONTH": 62.26,
"OUTFLOW_FUNDS_MONTH": 32.59
},
{
"CURR_DATE": "2019-08-30",
"DEPT": "8700",
"TOTAL_MANAGED_MARKET_VALUE": 1596.74,
"TOTAL_MANAGED_MARKET_VALUE_GROWTH": 41.86,
"TMMARKET_VALUE_DOD_GROWTH_RATE": 0.03470208565515685,
"TMMARKET_VALUE_GROWTH_MON": 0.07818120801111743,
"TMMARKET_VALUE_GROWTH_YEAR": -0.05440250244736409,
"TMMARKET_VALUE_SHARECOM": 0.09997733019626448,
"TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.019726478499825697,
"AVERAGE_NEW_ASSETS_DAYINMON": 1007.9314679742108,
"YEAR_NEW_ASSET_SSHARECOM": 0.15123738798885086,
"YN_ASSET_SSHARECOM_GROWTH_RATE": 0.04694052069678048,
"POTENTIAL_LOST_ASSETS": 52.48,
"TOTAL_LIABILITIES": 55.28,
"TOTAL_ASSETS": 1366.72,
"TOTAL_ASSETS_DOD_GROWTH": 10.12,
"TOTAL_ASSETS_DOD_GROWTH_RATE": 0.009708491982487952,
"NEW_ASSETS_DAY": 12.42,
"NEW_ASSETS_MON": 41.14,
"NEW_ASSETS_YEAR": 279.32,
"NEW_ASSETS_DOD_GROWTH_RATE": -0.025878627161898062,
"NEW_FUNDS_DAY": 3.65,
"INFLOW_FUNDS_DAY": 14.15,
"OUTFLOW_FUNDS_DAY": 17.08,
"OVERALL_POSITION": 0.9098432997243932,
"OVERALL_POSITION_DOD_GROWTH_RATE": 0.02111922282868306,
"NEW_CUST_FUNDS_MON": 57.21,
"INFLOW_FUNDS_MONTH": 61.16,
"OUTFLOW_FUNDS_MONTH": 15.83
},
{
"CURR_DATE": "2019-06-30",
"DEPT": "6501",
"TOTAL_MANAGED_MARKET_VALUE": 1506.72,
"TOTAL_MANAGED_MARKET_VALUE_GROWTH": -13.23,
"TMMARKET_VALUE_DOD_GROWTH_RATE": -0.0024973354204176554,
"TMMARKET_VALUE_GROWTH_MON": -0.015530793150701896,
"TMMARKET_VALUE_GROWTH_YEAR": -0.08556724628979398,
"TMMARKET_VALUE_SHARECOM": 0.15000077963967678,
"TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.049629446804825755,
"AVERAGE_NEW_ASSETS_DAYINMON": 1250.1040863177336,
"YEAR_NEW_ASSET_SSHARECOM": 0.19098445630488178,
"YN_ASSET_SSHARECOM_GROWTH_RATE": -0.007881179708853471,
"POTENTIAL_LOST_ASSETS": 50.53,
"TOTAL_LIABILITIES": 56.62,
"TOTAL_ASSETS": 1499.53,
"TOTAL_ASSETS_DOD_GROWTH": 29.56,
"TOTAL_ASSETS_DOD_GROWTH_RATE": -0.02599813232345556,
"NEW_ASSETS_DAY": 28.81,
"NEW_ASSETS_MON": 123.24,
"NEW_ASSETS_YEAR": 263.63,
"NEW_ASSETS_DOD_GROWTH_RATE": 0.0073986669331394875,
"NEW_FUNDS_DAY": 18.52,
"INFLOW_FUNDS_DAY": 3.26,
"OUTFLOW_FUNDS_DAY": 6.92,
"OVERALL_POSITION": 0.8713692113306709,
"OVERALL_POSITION_DOD_GROWTH_RATE": 0.02977644553289545,
"NEW_CUST_FUNDS_MON": 85.14,
"INFLOW_FUNDS_MONTH": 23.35,
"OUTFLOW_FUNDS_MONTH": 92.95
},
{
"CURR_DATE": "2019-12-07",
"DEPT": "8705",
"TOTAL_MANAGED_MARKET_VALUE": 1575.85,
"TOTAL_MANAGED_MARKET_VALUE_GROWTH": 8.94,
"TMMARKET_VALUE_DOD_GROWTH_RATE": -0.04384846980627058,
"TMMARKET_VALUE_GROWTH_MON": -0.022962456288549656,
"TMMARKET_VALUE_GROWTH_YEAR": -0.005047009316021089,
"TMMARKET_VALUE_SHARECOM": 0.07819484815809447,
"TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.008534369960890256,
"AVERAGE_NEW_ASSETS_DAYINMON": 1340.0339240689955,
"YEAR_NEW_ASSET_SSHARECOM": 0.19019952857677042,
"YN_ASSET_SSHARECOM_GROWTH_RATE": 0.01272353909992914,
"POTENTIAL_LOST_ASSETS": 54.63,
"TOTAL_LIABILITIES": 53.17,
"TOTAL_ASSETS": 1315.08,
"TOTAL_ASSETS_DOD_GROWTH": 49.31,
"TOTAL_ASSETS_DOD_GROWTH_RATE": 0.0016538407028265922,
"NEW_ASSETS_DAY": 29.17,
"NEW_ASSETS_MON": 44.75,
"NEW_ASSETS_YEAR": 172.87,
"NEW_ASSETS_DOD_GROWTH_RATE": 0.045388692595736746,
"NEW_FUNDS_DAY": 18.46,
"INFLOW_FUNDS_DAY": 12.93,
"OUTFLOW_FUNDS_DAY": 10.38,
"OVERALL_POSITION": 0.8083127036694828,
"OVERALL_POSITION_DOD_GROWTH_RATE": -0.02847453515632541,
"NEW_CUST_FUNDS_MON": 49.74,
"INFLOW_FUNDS_MONTH": 81.93,
"OUTFLOW_FUNDS_MONTH": 18.17
}
]
}
我们需要把 result
结果中的部分 key 值数据获取
配置#
以下配置实现从接口获取数据并打印到终端
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "httpreader",
"parameter": {
"connection": [
{
"url": "http://127.0.0.1:9090/mock/17/LDJSC/ASSET",
"proxy": {
"host": "http://127.0.0.1:3128",
"auth": "user:pass"
}
}
],
"reqParams": {
"CURR_DATE":"2021-01-18",
"DEPT":"9700"
},
"resultKey":"result",
"method": "GET",
"column": ["CURR_DATE","DEPT","TOTAL_MANAGED_MARKET_VALUE","TOTAL_MANAGED_MARKET_VALUE_GROWTH"],
"username": "user",
"password": "passw0rd",
"headers": {
"X-Powered-by": "Addax"
}
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": "true"
}
}
}
]
}
}
将上述内容保存为 job/httpreader2stream.json
文件。
执行#
执行以下命令,进行采集
bin/addax.sh job/httpreader2stream.json
上述命令的输出结果大致如下:
2021-01-20 09:07:41.864 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-01-20 09:07:41.877 [main] INFO Engine - the machine info =>
osInfo: Mac OS X x86_64 10.15.1
jvmInfo: AdoptOpenJDK 14 14.0.2+12
cpu num: 8
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [G1 Young Generation, G1 Old Generation]
MEMORY_NAME | allocation_size | init_size
CodeHeap 'profiled nmethods' | 117.21MB | 2.44MB
G1 Old Gen | 2,048.00MB | 39.00MB
G1 Survivor Space | -0.00MB | 0.00MB
CodeHeap 'non-profiled nmethods' | 117.21MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
Metaspace | -0.00MB | 0.00MB
G1 Eden Space | -0.00MB | 25.00MB
CodeHeap 'non-nmethods' | 5.57MB | 2.44MB
2021-01-20 09:07:41.903 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"reqParams":{
"CURR_DATE":"2021-01-18",
"DEPT":"9700"
},
"method":"GET",
"column":[
"CURR_DATE",
"DEPT",
"TOTAL_MANAGED_MARKET_VALUE",
"TOTAL_MANAGED_MARKET_VALUE_GROWTH"
],
"resultKey":"result",
"connection":[
{
"url":"http://127.0.0.1:9090/mock/17/LDJSC/ASSET"
}
]
},
"name":"httpreader"
},
"writer":{
"parameter":{
"print":"true"
},
"name":"streamwriter"
}
}
],
"setting":{
"speed":{
"bytes":-1,
"channel":1
}
}
}
2021-01-20 09:07:41.926 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-01-20 09:07:41.927 [main] INFO JobContainer - Addax jobContainer starts job.
2021-01-20 09:07:41.928 [main] INFO JobContainer - Set jobId = 0
2021-01-20 09:07:42.002 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2019-08-30 9700 1539.85 -14.78
2019-10-01 9700 1531.71 47.66
2020-12-03 9700 1574.38 7.34
2020-11-31 9700 1528.13 41.62
2019-03-01 9700 1554.28 -9.29
2021-01-20 09:07:45.006 [job-0] INFO JobContainer -
任务启动时刻 : 2021-01-20 09:07:41
任务结束时刻 : 2021-01-20 09:07:44
任务总计耗时 : 3s
任务平均流量 : 42B/s
记录写入速度 : 1rec/s
读出记录总数 : 5
读写失败总数 : 0
参数说明#
配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
---|---|---|---|---|
url | 是 | string | 无 | 要访问的HTTP地址 |
reqParams | 否 | map | 无 | 接口请求参数 |
resultKey | 否 | string | 无 | 要获取结果的那个key值,如果是获取整个返回值,则可以不用填写 |
method | 否 | string | get | 请求模式,仅支持GET,POST两种,不区分大小写 |
column | 是 | list | 无 | 要获取的key,如果配置为 "*" ,则表示获取所有key的值 |
username | 否 | string | 无 | 接口请求需要的认证帐号(如有) |
password | 否 | string | 无 | 接口请求需要的密码(如有) |
proxy | 否 | map | 无 | 代理地址,详见下面描述 |
headers | 否 | map | 无 | 定制的请求头信息 |
proxy#
如果访问的接口需要通过代理,则可以配置 proxy
配置项,该配置项是一个 json 字典,包含一个必选的 host
字段和一个可选的 auth
字段。
{
"proxy": {
"host": "http://127.0.0.1:8080",
"auth": "user:pass"
}
}
如果是 sock
代理 (V4,v5),则可以写
{
"proxy": {
"host": "socks://127.0.0.1:8080",
"auth": "user:pass"
}
}
host
是代理地址,包含代理类型,目前仅支持 http
代理和 socks
(V4, V5均可) 代理。 如果代理需要认证,则可以配置 auth
, 它由 用户名和密码组成,两者之间用冒号(:) 隔开。
限制说明#
返回的结果必须是JSON类型
当前所有key的值均当作字符串类型
暂不支持接口Token鉴权模式
暂不支持分页获取
代理仅支持
http
模式
InfluxDB Reader#
InfluxDBReader 插件实现了从 InfluxDB 读取数据。底层实现上,是通过调用 InfluQL 语言查询表,然后获得返回数据。
示例#
以下示例用来演示该插件如何从指定表(即指标)上读取数据并输出到终端
创建需要的库表和数据#
通过以下命令来创建需要读取的表以及数据
# create database
influx --execute "CREATE DATABASE NOAA_water_database"
# download sample data
curl https://s3.amazonaws.com/noaa.water-database/NOAA_data.txt -o NOAA_data.txt
# import data via influx-cli
influx -import -path=NOAA_data.txt -precision=s -database=NOAA_water_database
创建 job 文件#
创建 job/influxdb2stream.json
文件,内容如下:
{
"job": {
"content": [
{
"reader": {
"name": "influxdbreader",
"parameter": {
"column": [
"*"
],
"connection": [
{
"endpoint": "http://localhost:8086",
"database": "NOAA_water_database",
"table": "h2o_feet",
"where": "1=1"
}
],
"connTimeout": 15,
"readTimeout": 20,
"writeTimeout": 20,
"username": "influx",
"password": "influx123"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": "true"
}
}
}
],
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
}
}
}
参数说明#
配置项 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
endpoint | 是 | string | 无 | |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
database | 是 | string | 无 | 数据源指定的数据库 |
table | 是 | string | 无 | 所选取的需要同步的表名 |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmreader |
connTimeout | 否 | int | 15 | 设置连接超时值,单位为秒 |
readTimeout | 否 | int | 20 | 设置读取超时值,单位为秒 |
writeTimeout | 否 | int | 20 | 设置写入超时值,单位为秒 |
where | 否 | 无 | 针对表的筛选条件 | |
querySql | 否 | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
类型转换#
当前实现是将所有字段当作字符串处理
限制#
当前插件仅支持 1.x 版本,2.0 及以上并不支持
JsonFile Reader#
JsonFileReader 提供了读取本地文件系统数据存储的能力。
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"writer": {
"name": "streamwriter",
"parameter": {
"print": "true"
}
},
"reader": {
"name": "jsonfilereader",
"parameter": {
"path": [
"/tmp/test*.json"
],
"column": [
{
"index": "$.id",
"type": "long"
},
{
"index": "$.name",
"type": "string"
},
{
"index": "$.age",
"type": "long"
},
{
"index": "$.score.math",
"type": "double"
},
{
"index": "$.score.english",
"type": "double"
},
{
"index": "$.pubdate",
"type": "date"
},
{
"type": "string",
"value": "constant string"
}
]
}
}
}
]
}
}
其中 /tmp/test*.json
为同一个 json 文件的多个复制,内容如下:
{
"name": "zhangshan",
"id": 19890604,
"age": 12,
"score": {
"math": 92.5,
"english": 97.5,
"chinese": 95
},
"pubdate": "2020-09-05"
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
path | 是 | 无 | 本地文件系统的路径信息,注意这里可以支持填写多个路径,详细描述见下文 |
column | 是 | 无 | 读取字段列表,type指定源数据的类型,详见下文 |
fieldDelimiter | 是 | , |
描述:读取的字段分隔符 |
compress | 否 | 无 | 文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、gzip、bzip2 |
encoding | 否 | utf-8 | 读取文件的编码配置 |
path#
本地文件系统的路径信息,注意这里可以支持填写多个路径
当指定单个本地文件,JsonFileReader暂时只能使用单线程进行数据抽取。
当指定多个本地文件,JsonFileReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。
当指定通配符,JsonFileReader尝试遍历出多个文件信息。例如: 指定
/*
代表读取/目录下所有的文件,指定/bazhen/*
代表读取bazhen目录下游所有的文件。 JsonFileReader目前只支持*
作为文件通配符。
特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。
column#
读取字段列表,type指定源数据的类型,index指定当前列来自于json的指定,语法为 Jayway JsonPath 的语法,value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。 用户必须指定Column字段信息
对于用户指定Column信息,type必须填写,index/value必须选择其一
类型转换#
Addax 内部类型 | 本地文件 数据类型 |
---|---|
Long | Long |
Double | Double |
String | String |
Boolean | Boolean |
Date | Date |
Kudu Reader#
KuduReader 插件利用 Kudu 的java客户端KuduClient进行Kudu的读操作。
配置示例#
我们通过 Trino 的 kudu connector
连接 kudu 服务,然后进行表创建以及数据插入
建表语句以及数据插入语句#
CREATE TABLE kudu.default.users (
user_id int WITH (primary_key = true),
user_name varchar with (nullable=true),
age int with (nullable=true),
salary double with (nullable=true),
longtitue decimal(18,6) with (nullable=true),
latitude decimal(18,6) with (nullable=true),
p decimal(21,20) with (nullable=true),
mtime timestamp with (nullable=true)
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 2
);
insert into kudu.default.users
values
(1, cast('wgzhao' as varchar), 18, cast(18888.88 as double),
cast(123.282424 as decimal(18,6)), cast(23.123456 as decimal(18,6)),
cast(1.12345678912345678912 as decimal(21,20)),
timestamp '2021-01-10 14:40:41'),
(2, cast('anglina' as varchar), 16, cast(23456.12 as double),
cast(33.192123 as decimal(18,6)), cast(56.654321 as decimal(18,6)),
cast(1.12345678912345678912 as decimal(21,20)),
timestamp '2021-01-10 03:40:41');
-- ONLY insert primary key value
insert into kudu.default.users(user_id) values (3);
配置#
以下是读取kudu表并输出到终端的配置
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "kudureader",
"parameter": {
"masterAddress": "localhost:7051,localhost:7151,localhost:7251",
"table": "users",
"splitPk": "user_id",
"lowerBound": 1,
"upperBound": 100,
"readTimeout": 5,
"scanTimeout": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
把上述配置文件保存为 job/kudu2stream.json
执行#
执行下面的命令进行采集
bin/addax.sh job/kudu2stream.json
输出结果类似如下(删除了不必需要的内容)
2021-01-10 15:46:59.303 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-01-10 15:46:59.329 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"masterAddress":"localhost:7051,localhost:7151,localhost:7251",
"upperBound":100,
"readTimeout":5,
"lowerBound":1,
"splitPk":"user_id",
"table":"users",
"scanTimeout":10,
"column":[]
},
"name":"kudureader"
},
"writer":{
"parameter":{
"print":true
},
"name":"streamwriter"
}
}
],
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"channel":3
}
}
}
3 null null null null null null null
1 wgzhao 18 18888.88 123.282424 23.123456 1.12345678912345678912 2021-01-10 22:40:41
2 anglina 16 23456.12 33.192123 56.654321 1.12345678912345678912 2021-01-10 11:40:41
任务启动时刻 : 2021-01-10 15:46:59
任务结束时刻 : 2021-01-10 15:47:02
任务总计耗时 : 3s
任务平均流量 : 52B/s
记录写入速度 : 0rec/s
读出记录总数 : 2
读写失败总数 : 0
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
masterAddress | 必须 | string | 无 | Kudu Master集群RPC地址,多个地址用逗号(,)分隔 |
table | 必须 | string | 无 | kudu 表名 |
splitPk | 否 | string | 无 | 并行读取数据分片字段 |
lowerBound | 否 | string | 无 | 并行读取数据分片范围下界 |
upperBound | 否 | string | 无 | 并行读取数据分片范围上界 |
readTimeout | 否 | int | 10 | 读取数据超时(秒) |
scanTimeout | 否 | int | 20 | 数据扫描请求超时(秒) |
column | 否 | list | 无 | 指定要获取的字段,多个字段用逗号分隔,比如 "column":["user_id","user_name","age"] |
where | 否 | list | 无 | 指定其他过滤条件,详见下面描述 |
where#
where
用来定制更多的过滤条件,他是一个数组类型,数组的每个元素都是一个过滤条件,比如
{
"where": ["age > 1", "user_name = 'wgzhao'"]
}
上述定义了两个过滤条件,每个过滤条件由三部分组成,格式为 column operator value
column
: 要过滤的字段operator
: 比较符号,当前仅支持=
,>
, '>=',<
,<=
, 其他操作符号当前还不支持value
: 比较值,如果是字符串,可以加上单引号('
), 不加可以,因为实际类型会从数据库表中获取对应字段(column
)的类型,但如果值含有空格,则一定要加上单引号
这里还有其他一些限定,在使用时,要特别注意:
上述三个部分之间至少有一个空格
age>1
,age >1
这种均无效,这是因为我们实际上是把 SQL 风格的过滤提交转换为 Kudu 的 KuduPredicate 类多个过滤条件之间的逻辑与关系(
AND
),暂不支持逻辑或(OR
)关系
类型转换#
Addax 内部类型 | Kudu 数据类型 |
---|---|
Long | byte, short, int, long |
Double | float, double, decimal |
String | string |
Date | timestamp |
Boolean | boolean |
Bytes | binary |
MongoDB Reader#
MongoDBReader 插件利用 MongoDB 的java客户端MongoClient进行MongoDB的读操作。
配置样例#
该示例从MongoDB中读一张表并打印到终端
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [
"127.0.0.1:32768"
],
"userName": "",
"userPassword": "",
"dbName": "tag_per_data",
"collectionName": "tag_data",
"column": [
{
"name": "unique_id",
"type": "string"
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "Array",
"spliter": ""
},
{
"name": "categoryid",
"type": "Array",
"spliter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "Array",
"spliter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": "true"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
address | 是 | 无 | MongoDB的数据地址信息,因为 MonogDB 可能是个集群,则ip端口信息需要以Json数组的形式给出 |
userName | 否 | 无 | MongoDB的用户名 |
userPassword | 否 | 无 | MongoDB的密码 |
collectionName | 是 | 无 | MongoDB的集合名 |
column | 是 | 无 | MongoDB的文档列名 |
name | 是 | 无 | Column的名字 |
type | 否 | 无 | Column的类型 |
splitter | 否 | 无 | 指定 MongoDB数组转为字符串的分隔符 |
类型转换#
Addax 内部类型 | MongoDB 数据类型 |
---|---|
Long | int, Long |
Double | double |
String | string, array |
Date | date |
Boolean | boolean |
Bytes | bytes |
Mysql Reader#
MysqlReader 插件实现了从Mysql读取数据
示例#
我们在 MySQL 的 test 库上创建如下表:
CREATE TABLE addax_reader
(
c_bigint bigint,
c_varchar varchar(100),
c_timestamp timestamp,
c_text text,
c_decimal decimal(8, 3),
c_mediumtext mediumtext,
c_longtext longtext,
c_int int,
c_time time,
c_datetime datetime,
c_enum enum('one', 'two', 'three'),
c_float float,
c_smallint smallint,
c_bit bit,
c_double double,
c_blob blob,
c_char char(5),
c_varbinary varbinary(100),
c_tinyint tinyint,
c_json json,
c_set SET ('a', 'b', 'c', 'd'),
c_binary binary,
c_longblob longblob,
c_mediumblob mediumblob
);
然后插入下面一条记录
INSERT INTO addax_reader
VALUES (2E18,
'a varchar data',
'2021-12-12 12:12:12',
'a long text',
12345.122,
'a medium text',
'a long text',
2 ^ 32 - 1,
'12:13:14',
'2021-12-12 12:13:14',
'one',
17.191,
126,
0,
1114.1114,
'blob',
'a123b',
'a var binary content',
126,
'{"k1":"val1","k2":"val2"}',
'b',
binary(1),
x '89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200',
x '89504E470D0A1A0A0000000D');
下面的配置是读取该表到终端的作业:
{
"job": {
"setting": {
"speed": {
"channel": 3,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"connection": [
{
"table": [
"addax_reader"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test"
],
"driver": "com.mysql.jdbc.Driver"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
将上述配置文件保存为 job/mysql2stream.json
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | list | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
driver | 否 | string | 无 | 自定义驱动类名,解决兼容性问题,详见下面描述 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
table | 是 | list | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合,详细描述 rdbmreader |
splitPk | 否 | string | 无 | 使用splitPk代表的字段进行数据分片,详细描述见 rdbmreader |
autoPk | 否 | bool | false | 是否自动猜测分片主键,3.2.6 版本引入 |
where | 否 | string | 无 | 针对表的筛选条件 |
querySql | 否 | list | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
driver#
当前 Addax 采用的 MySQL JDBC 驱动为 8.0 以上版本,驱动类名使用的 com.mysql.cj.jdbc.Driver
,而不是 com.mysql.jdbc.Driver
。 如果你需要采集的 MySQL 服务低于 5.6
,需要使用到 Connector/J 5.1
驱动,则可以采取下面的步骤:
替换插件内置的驱动
rm -f plugin/reader/mysqlreader/lib/mysql-connector-java-*.jar
拷贝老的驱动到插件目录
cp mysql-connector-java-5.1.48.jar plugin/reader/mysqlreader/lib/
指定驱动类名称
在你的 json 文件类,配置 "driver": "com.mysql.jdbc.Driver"
类型转换#
目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
Addax 内部类型 | MySQL 数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext, year |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
请注意:
除上述罗列字段类型外,其他类型均不支持
tinyint(1)
Addax视作为整形year
Addax视作为字符串类型bit
Addax属于未定义行为
数据库编码问题#
Mysql本身的编码设置非常灵活,包括指定编码到库、表、字段级别,甚至可以均不同编码。优先级从高到低为字段、表、库、实例。我们不推荐数据库用户设置如此混乱的编码,最好在库级别就统一到UTF-8。
MysqlReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此MysqlReader不需用户指定编码,可以自动获取编码并转码。
对于Mysql底层写入编码和其设定的编码不一致的混乱情况,MysqlReader对此无法识别,对此也无法提供解决方案,对于这类情况,导出有可能为乱码
。
Oracle Reader#
OracleReader 插件用于从Oracle读取数据
配置样例#
配置一个从Oracle数据库同步抽取数据到本地的作业:
{
"job": {
"setting": {
"speed": {
"byte": 1048576,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "oraclereader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:oracle:thin:@<HOST_NAME>:PORT:<DATABASE_NAME>"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
table | 是 | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmsreader |
splitPk | 否 | 无 | 使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能 |
autoPk | 否 | false | 是否自动猜测分片主键,3.2.6 版本引入 |
where | 否 | 无 | 针对表的筛选条件 |
querySql | 否 | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
fetchSize | 否 | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM |
session | 否 | 无 | 针对本地连接,修改会话配置,详见下文 |
session#
控制写入数据的时间格式,时区等的配置,如果表中有时间字段,配置该值以明确告知写入 oracle 的时间格式。通常配置的参数为:NLS_DATE_FORMAT
,NLS_TIME_FORMAT
。其配置的值为 json
格式,例如:
{
"session": [
"alter session set NLS_DATE_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set NLS_TIMESTAMP_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set NLS_TIMESTAMP_TZ_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set TIME_ZONE='Asia/Chongqing'"
]
}
注意 "
是 "
的转义字符串
类型转换#
Addax 内部类型 | Oracle 数据类型 |
---|---|
Long | NUMBER, INTEGER, INT, SMALLINT |
Double | NUMERIC, DECIMAL, FLOAT, DOUBLE PRECISION, REAL |
String | LONG ,CHAR, NCHAR, VARCHAR, VARCHAR2, NVARCHAR2, CLOB, NCLOB, CHARACTER |
String | CHARACTER VARYING, CHAR VARYING, NATIONAL CHARACTER, NATIONAL CHAR, NATIONAL CHARACTER VARYING |
String | NATIONAL CHAR VARYING, NCHAR VARYING |
Date | TIMESTAMP, DATE |
Boolean | bit, bool |
Bytes | BLOB, BFILE, RAW, LONG RAW |
请注意: 除上述罗列字段类型外,其他类型均不支持
数据库编码问题#
OracleReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此OracleReader不需用户指定编码,可以自动获取编码并转码。
对于Oracle底层写入编码和其设定的编码不一致的混乱情况,OracleReader对此无法识别,对此也无法提供解决方案,对于这类情况,导出有可能为乱码。
Postgresql Reader#
PostgresqlReader 插件用于从 PostgreSQL 读取数据
示例#
假定建表语句以及输入插入语句如下:
create table if not exists addax_tbl
(
c_bigint
bigint,
c_bit
bit(3),
c_bool boolean,
c_byte bytea,
c_char char(10),
c_varchar varchar(20),
c_date date,
c_double float8,
c_int integer,
c_json json,
c_number decimal(8, 3),
c_real real,
c_small smallint,
c_text text,
c_ts timestamp,
c_uuid uuid,
c_xml xml,
c_money money,
c_inet inet,
c_cidr cidr,
c_macaddr macaddr
);
insert into addax_tbl
values (999988887777,
B '101',
TRUE,
'\xDEADBEEF',
'hello',
'hello, world',
'2021-01-04',
999888.9972,
9876542,
'{"bar": "baz", "balance": 7.77, "active": false}'::json,
12345.123,
123.123,
126,
'this is a long text ',
'2020-01-04 12:13:14',
'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11'::uuid,
'<foo>bar</foo>'::xml,
'52093.89'::money,
'192.168.1.1'::inet,
'192.168.1/24'::cidr,
'08002b:010203'::macaddr);
配置一个从PostgreSQL数据库同步抽取数据到本地的作业:
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "pgtest",
"password": "pgtest",
"column": [
"*"
],
"connection": [
{
"table": [
"addax_tbl"
],
"jdbcUrl": [
"jdbc:postgresql://127.0.0.1:5432/pgtest"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
将上述配置文件保存为 job/postgres2stream.json
执行采集命令#
执行以下命令进行数据采集
bin/addax.sh job/postgres2stream.json
其输出信息如下(删除了非关键信息)
2021-01-07 10:15:12.295 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"password":"*****",
"column":[
"*"
],
"connection":[
{
"jdbcUrl":[
"jdbc:postgresql://localhost:5432/pgtest"
],
"table":[
"addax_tbl"
]
}
],
"username":"pgtest"
},
"name":"postgresqlreader"
},
"writer":{
"parameter":{
"print":true
},
"name":"streamwriter"
}
}
],
"setting":{
"speed":{
"byte":-1,
"channel":1
}
}
}
999988887777 101 true ޭ�� hello hello, world 2021-01-04 999888.99719999998 9876542 {"bar": "baz", "balance": 7.77, "active": false} 12345.123 123.123 126 this is a long text 2020-01-04 12:13:14 a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11 <foo>bar</foo> 52093.89 192.168.1.1 192.168.1.0/24 08:00:2b:01:02:03
任务启动时刻 : 2021-01-07 10:15:12
任务结束时刻 : 2021-01-07 10:15:15
任务总计耗时 : 3s
任务平均流量 : 90B/s
记录写入速度 : 0rec/s
读出记录总数 : 1
读写失败总数 : 0
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接 附件控制信息 |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
table | 是 | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmsreader |
splitPk | 否 | 无 | 使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能 |
autoPk | 否 | false | 是否自动猜测分片主键,3.2.6 版本引入 |
where | 否 | 无 | 针对表的筛选条件 |
querySql | 否 | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
fetchSize | 否 | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM |
类型转换#
Addax 内部类型 | PostgreSQL 数据类型 |
---|---|
Long | bigint, bigserial, integer, smallint, serial |
Double | double precision, money, numeric, real |
String | varchar, char, text, bit(>1), inet, cidr, macaddr, array,uuid,json,xml |
Date | date, time, timestamp |
Boolean | bool,bit(1) |
Bytes | bytea |
已知限制#
除上述罗列字段类型外,其他类型均不支持;
RDBMS Reader#
RDBMSReader 插件支持从传统 RDBMS 读取数据。这是一个通用关系数据库读取插件,可以通过注册数据库驱动等方式支持更多关系数据库读取。
同时 RDBMS Reader 又是其他关系型数据库读取插件的的基础类。以下读取插件均依赖该插件
注意, 如果已经提供了专门的数据库读取插件的,推荐使用专用插件,如果你需要读取的数据库没有专门插件,则考虑使用该通用插件。 在使用之前,还需要执行以下操作才可以正常运行,否则运行会出现异常。
配置驱动#
假定你需要读取 IBM DB2 的数据,因为没有提供专门的读取插件,所以我们可以使用该插件来实现,在使用之前,需要执行下面两个操作:
下载对应的 JDBC 驱动,并拷贝到
plugin/reader/rdbmsreader/libs
目录修改
plugin/reader/rdbmsreader/plugin.json
文件,找到drivers
一项,填写正确的 JDBC 驱动名,比如 DB2 的驱动名为com.ibm.db2.jcc.DB2Driver
,类似这样:{ "name": "rdbmsreader", "class": "com.wgzhao.addax.plugin.reader.rdbmsreader.RdbmsReader", "description": "", "developer": "wgzhao", "drivers": ["com.ibm.db2.jcc.DB2Driver"] }
以下列出常见的数据库以及对应的驱动名称
Apache Impala:
com.cloudera.impala.jdbc41.Driver
Enterprise DB:
com.edb.Driver
PrestoDB:
com.facebook.presto.jdbc.PrestoDriver
IBM DB2:
com.ibm.db2.jcc.DB2Driver
MySQL:
com.mysql.cj.jdbc.Driver
Sybase Server:
com.sybase.jdbc3.jdbc.SybDriver
TDengine:
com.taosdata.jdbc.TSDBDriver
达梦数据库:
dm.jdbc.driver.DmDriver
星环Inceptor:
io.transwarp.jdbc.InceptorDriver
TrinoDB:
io.trino.jdbc.TrinoDriver
PrestoSQL:
io.prestosql.jdbc.PrestoDriver
Oracle DB:
oracle.jdbc.OracleDriver
PostgreSQL:
org.postgresql.Drive
配置说明#
以下配置展示了如何从 Presto 数据库读取数据到终端
{
"job": {
"setting": {
"speed": {
"byte": 1048576,
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "hive",
"password": "",
"column": [
"*"
],
"connection": [
{
"table": [
"default.table"
],
"jdbcUrl": [
"jdbc:presto://127.0.0.1:8080/hive"
],
"driver": ""
}
],
"fetchSize": 1024,
"where": "1 = 1"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | array | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
driver | 否 | string | 无 | 自定义驱动类名,解决兼容性问题,详见下面描述 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
table | 是 | array | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | array | 无 | 所配置的表中需要同步的列名集合,详细描述见后 |
splitPk | 否 | string | 无 | 使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能,注意事项见后 |
autoPk | 否 | bool | false | 是否自动猜测分片主键,3.2.6 版本引入,详见后面描述 |
where | 否 | string | 无 | 针对表的筛选条件 |
querySql | 否 | string | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
fetchSize | 否 | int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM |
jdbcUrl#
jdbcUrl
配置除了配置必要的信息外,我们还可以在增加每种特定驱动的特定配置属性,这里特别提到我们可以利用配置属性对代理的支持从而实现通过代理访问数据库的功能。 比如对于 PrestoSQL 数据库的 JDBC 驱动而言,支持 socksProxy
参数,于是上述配置的 jdbcUrl
可以修改为
jdbc:presto://127.0.0.1:8080/hive?socksProxy=192.168.1.101:1081
大部分关系型数据库的 JDBC 驱动支持 socksProxyHost,socksProxyPort
参数来支持代理访问。也有一些特别的情况。
以下是各类数据库 JDBC 驱动所支持的代理类型以及配置方式
数据库 | 代理类型 | 代理配置 | 例子 |
---|---|---|---|
MySQL | socks | socksProxyHost,socksProxyPort | socksProxyHost=192.168.1.101&socksProxyPort=1081 |
Presto | socks | socksProxy | socksProxy=192.168.1.101:1081 |
Presto | http | httpProxy | httpProxy=192.168.1.101:3128 |
driver#
大部分情况下,一个数据库的JDBC驱动是固定的,但有些因为版本的不同,所建议的驱动类名不同,比如 MySQL。 新的 MySQL JDBC 驱动类型推荐使用 com.mysql.cj.jdbc.Driver
而不是以前的 com.mysql.jdbc.Drver
。如果想要使用就的驱动名称,则可以配置 driver
配置项。
column#
所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用 *
代表默认使用所有列配置,例如 ["*"]
。
支持列裁剪,即列可以挑选部分列进行导出。
支持列换序,即列可以不按照表schema信息进行导出。
支持常量配置,用户需要按照JSON格式:
["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
id
为普通列名`table`
为包含保留在的列名,1
为整形数字常量,'bazhen.csy'
为字符串常量null
为空指针,注意,这里的null
必须以字符串形式出现,即用双引号引用to_char(a + 1)
为表达式,2.3
为浮点数,true
为布尔值,同样的,这里的布尔值也必须用双引号引用
Column必须显示填写,不允许为空!
splitPk#
如果指定 splitPk
,表示用户希望使用 splitPk
代表的字段进行数据分片,因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。
推荐 splitPk
用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
目前 splitPk
仅支持整形、字符串型数据(ASCII类型) 切分,不支持浮点、日期等其他类型。 如果用户指定其他非支持类型,RDBMSReader 将报错!
splitPk
如果不填写,将视作用户不对单表进行切分,而使用单通道同步全量数据。
autoPk#
从 3.2.6
版本开始,支持自动获取表主键或唯一索引,如果设置为 true
,将尝试通过查询数据库的元数据信息获取指定表的主键字段或唯一索引字段,如果获取可用于分隔的 字段不止一个,则默认取第一个。
该特性目前支持的数据库有:
ClickHouse
MySQL
Oracle
PostgreSQL
SQL Server
类型转换#
Addax 内部类型 | RDBMS 数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext, year,xml |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
当前支持的数据库#
Redis Reader#
RedisReader 插件用于读取 Redis RDB 数据
配置样例#
{
"job": {
"content": [
{
"reader": {
"name": "redisreader",
"parameter": {
"connection": [
{
"uri": "file:///root/dump.rdb",
"uri": "http://localhost/dump.rdb",
"uri": "tcp://127.0.0.1:7001",
"uri": "tcp://127.0.0.1:7002",
"uri": "tcp://127.0.0.1:7003",
"auth": "password"
}
],
"include": [
"^user"
],
"exclude": [
"^password"
],
"db": [
0,
1
]
}
},
"writer": {
"name": "rediswriter",
"parameter": {
"connection": [
{
"uri": "tcp://127.0.0.1:6379",
"auth": "123456"
}
],
"timeout": 60000
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
uri | 是 | 否 | redis链接,支持多个本地rdb文件/网络rdb文件,如果是集群,填写所有master节点地址 |
db | 否 | 无 | 需要读取的db索引,若不填写,则读取所有db |
include | 否 | 无 | 要包含的 key, 支持正则表达式 |
exclude | 否 | 无 | 要排除的 key,支持正则表达式 |
约束限制#
不支持直接读取任何不支持
sync
命令的 redis server,如果需要请备份的rdb文件进行读取。如果是原生redis cluster集群,请填写所有master节点的tcp地址,
redisreader
插件会自动dump 所有节点的rdb文件。仅解析
String
数据类型,其他复合类型(Sets
,List
等会忽略)
SQLite Reader#
SQLiteReader 插件用于读取指定目录下的 sqlite 文件, 他继承于 rdbmsreader
示例#
我们创建示例文件:
$ sqlite3 /tmp/test.sqlite3
SQLite version 3.7.17 2013-05-20 00:56:22
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite> create table test(id int, name varchar(10), salary double);
sqlite> insert into test values(1,'foo', 12.13),(2,'bar',202.22);
sqlite> .q
下面的配置是读取该表到终端的作业:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "sqlitereader",
"parameter": {
"username": "fakeuser",
"password": "",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": [
"jdbc:sqlite:/tmp/test.sqlite3"
],
"table": [
"test"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
将上述配置文件保存为 job/sqlite2stream.json
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | list | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
driver | 否 | string | 无 | 自定义驱动类名,解决兼容性问题,详见下面描述 |
username | 是 | string | 无 | 数据源的用户名, 可随意配置,但不能缺失 |
password | 否 | string | 无 | 数据源指定用户名的密码,可不配置 |
table | 是 | list | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合,详细描述 rdbmreader |
splitPk | 否 | string | 无 | 使用splitPk代表的字段进行数据分片,详细描述见 rdbmreader |
autoPk | 否 | bool | false | 是否自动猜测分片主键,3.2.6 版本引入 |
where | 否 | string | 无 | 针对表的筛选条件 |
querySql | 否 | list | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
类型转换#
Addax 内部类型 | MySQL 数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext, year |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
SqlServer Reader#
SqlServerReader插件用于从从SqlServer读取数据。
配置样例#
配置一个从SqlServer数据库同步抽取数据到本地的作业:
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:sqlserver://localhost:3433;DatabaseName=dbname"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
table | 是 | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmsreader |
splitPk | 否 | 无 | 使用splitPk代表的字段进行数据分片,详细描述见 rdbmsreader |
autoPk | 否 | false | 是否自动猜测分片主键,3.2.6 版本引入 |
where | 否 | 无 | 针对表的筛选条件 |
querySql | 否 | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
fetchSize | 否 | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM |
类型转换#
Addax 内部类型 | SqlServer 数据类型 |
---|---|
Long | bigint, int, smallint, tinyint |
Double | float, decimal, real, numeric |
String | char,nchar,ntext,nvarchar,text,varchar,nvarchar(MAX),varchar(MAX) |
Date | date, datetime, time |
Boolean | bit |
Bytes | binary,varbinary,varbinary(MAX),timestamp, image |
请注意:
除上述罗列字段类型外,其他类型均不支持
timestamp类型作为二进制类型
Stream Reader#
StreamReader 是一个从内存读取数据的插件, 他主要用来快速生成期望的数据并对写入插件进行测试
一个完整的 StreamReader 配置文件如下:
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "unique_id",
"type": "string"
},
{
"value": "1989-06-04 08:12:13",
"type": "date",
"dateFormat": "yyyy-MM-dd HH:mm:ss"
},
{
"value": 1984,
"type": "long"
},
{
"value": 1989.64,
"type": "double"
},
{
"value": true,
"type": "bool"
},
{
"value": "a long text",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
}
}
上述配置文件将会生成 10条记录(假定channel为1),每条记录的内容如下:
unique_id,'1989-06-04 08:12:13',1984,1989.64,true,'a long text'
目前 StreamReader 支持的输出数据类型全部列在上面,分别是:
string
字符类型date
日期类型long
所有整型类型double
所有浮点数bool
布尔类型bytes
字节类型
其中 date
类型还支持 dateFormat
配置,用来指定输入的日期的格式,默认为 yyyy-MM-dd HH:mm:ss
。比如你的输入可以这样:
{
"value": "1989/06/04 12:13:14",
"type": "date",
"dateFormat": "yyyy/MM/dd HH:mm:ss"
}
注意,日期类型不管输入是何种格式,内部都转为 yyyy-MM-dd HH:mm:ss
格式。
StreamReader 还支持随机输入功能,比如我们要随机得到0-10之间的任意一个整数,我们可以这样配置列:
{
"random": "0,10",
"type": "long"
}
获得一个 0 至 100 之间的随机浮点数,可以这样配置:
{
"random": "0,100",
"type": "double"
}
如果要指定浮点数的小数位数,比如指定小数位为2位,则可以这样设定
{
"random": "0,100,2",
"type": "double"
}
注意: 并不能保证每次生成的小数恰好是2位,如果小数为数为0 ,则小数位数会少于指定的位数。
这里使用 random
这个关键字来表示其值为随机值,其值的范围为左右闭区间。
其他类型的随机类型配置如下:
long
: random 0, 10 0到10之间的随机数字string
: random 0, 10 0到 10 长度之间的随机字符串bool
: random 0, 10 false 和 true出现的比率double
: random 0, 10 0到10之间的随机浮点数double
: random 0, 10, 2 0到10之间的随机浮点数,小数位为2位date
: random '2014-07-07 00:00:00', '2016-07-07 00:00:00' 开始时间->结束时间之间的随机时间,日期格式默认(不支持逗号)yyyy-MM-dd HH:mm:ssBYTES
: random 0, 10 0到10长度之间的随机字符串获取其UTF-8编码的二进制串
StreamReader 还支持递增函数,比如我们要得到一个从1开始,每次加5的等差数列,可以这样配置:
{
"incr": "1,5",
"type": "long"
}
如果需要获得一个递减的数列,则把第二个参数的步长(上例中的5)改为负数即可。步长默认值为1。
递增还支持日期类型( 4.0.1
版本引入),比如下面的配置:
{
"incr": "1989-06-04 09:01:02,2,d",
"type": "date"
}
incr
由三部分组成,分别是开始日期,步长以及步长单位,中间用英文逗号(,)分隔。
开始日期:正确的日期字符串,默认格式为
yyyy-MM-dd hh:mm:ss
,如果时间格式不同,则需要配置dateFormat
来指定日期格式,这是必填项步长:每次需要增加的长度,默认为1,如果希望是递减,则填写负数,这是可选项
步长单位:按什么时间单位进行递增/递减,默认为按天(day),这是可选项,可选的单位有
d/day
M/month
y/year
h/hour
m/minute
s/second
w/week
配置项 sliceRecordCount
用来指定要生成的数据条数,如果指定的 channel
,则实际生成的记录数为 sliceRecordCount * channel
TDengine Reader#
TDengineReader 插件用于从涛思公司的 TDengine 读取数据。
前置条件#
考虑到性能问题,该插件使用了 TDengine 的 JDBC-JNI 驱动, 该驱动直接调用客户端 API(libtaos.so
或 taos.dll
)将写入和查询请求发送到 taosd 实例。因此在使用之前需要配置好动态库链接文件。
首先将 plugin/reader/tdenginereader/libs/libtaos.so.2.0.16.0
拷贝到 /usr/lib64
目录,然后执行下面的命令创建软链接
ln -sf /usr/lib64/libtaos.so.2.0.16.0 /usr/lib64/libtaos.so.1
ln -sf /usr/lib64/libtaos.so.1 /usr/lib64/libtaos.so
示例#
TDengine 数据自带了一个演示数据库 taosdemo , 我们从演示数据库读取部分数据并打印到终端
以下是配置文件
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"jdbcUrl": [
"jdbc:TAOS://127.0.0.1:6030/test"
],
"querySql": [
"select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 10"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
]
}
}
将上述配置文件保存为 job/tdengine2stream.json
执行采集命令#
执行以下命令进行数据采集
bin/addax.sh job/tdengine2stream.json
命令输出类似如下:
2021-02-20 15:32:23.161 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-20 15:32:23.229 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"password":"*****",
"connection":[
{
"querySql":[
"select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 100"
],
"jdbcUrl":[
"jdbc:TAOS://127.0.0.1:6030/test"
]
}
],
"username":"root"
},
"name":"tdenginereader"
},
"writer":{
"parameter":{
"print":true
},
"name":"streamwriter"
}
}
],
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"channel":3
}
}
}
2021-02-20 15:32:23.277 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-20 15:32:23.278 [main] INFO JobContainer - Addax jobContainer starts job.
2021-02-20 15:32:23.281 [main] INFO JobContainer - Set jobId = 0
java.library.path:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
....
2021-02-20 15:32:23.687 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 100
] jdbcUrl:[jdbc:TAOS://127.0.0.1:6030/test].
2021-02-20 15:32:23.692 [0-0-0-reader] WARN DBUtil - current database does not supoort TYPE_FORWARD_ONLY/CONCUR_READ_ONLY
2021-02-20 15:32:23.740 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 100
] jdbcUrl:[jdbc:TAOS://127.0.0.1:6030/test].
1500000001000 5 5 0 1 beijing
1500000001000 0 6 2 1 beijing
1500000001000 7 0 0 1 beijing
1500000001000 8 9 6 1 beijing
1500000001000 9 9 1 1 beijing
1500000001000 8 2 0 1 beijing
1500000001000 4 5 5 3 beijing
1500000001000 3 3 3 3 beijing
1500000001000 5 4 8 3 beijing
1500000001000 9 4 6 3 beijing
2021-02-20 15:32:26.689 [job-0] INFO JobContainer -
任务启动时刻 : 2021-02-20 15:32:23
任务结束时刻 : 2021-02-20 15:32:26
任务总计耗时 : 3s
任务平均流量 : 800B/s
记录写入速度 : 33rec/s
读出记录总数 : 100
读写失败总数 : 0
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | list | 无 | 对端数据库的JDBC连接信息,注意这里的 TAOS 必须大写 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
table | 是 | list | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合,详细描述rdbmreader |
where | 否 | string | 无 | 针对表的筛选条件 |
querySql | 否 | list | 无 | 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table ,column 这些配置项 |
使用 JDBC-RESTful 接口#
如果不想依赖本地库,或者没有权限,则可以使用 JDBC-RESTful
接口来写入表,相比 JDBC-JNI 而言,配置区别是:
driverClass 指定为
com.taosdata.jdbc.rs.RestfulDriver
jdbcUrl 以
jdbc:TAOS-RS://
开头;使用
6041
作为连接端口
所以上述配置中的connection
应该修改为如下:
{
"connection": [
{
"querySql": [
"select * from test.meters where ts <'2017-07-14 10:40:02' and loc='beijing' limit 100"
],
"jdbcUrl": [
"jdbc:TAOS-RS://127.0.0.1:6041/test"
],
"driver": "com.taosdata.jdbc.rs.RestfulDriver"
}
]
}
类型转换#
Addax 内部类型 | TDengine 数据类型 |
---|---|
Long | SMALLINT, TINYINT, INT, BIGINT, TIMESTAMP |
Double | FLOAT, DOUBLE |
String | BINARY, NCHAR |
Boolean | BOOL |
当前支持版本#
TDengine 2.0.16
注意事项#
TDengine JDBC-JNI 驱动和动态库版本要求一一匹配,因此如果你的数据版本并不是
2.0.16
,则需要同时替换动态库和插件目录中的JDBC驱动
TxtFile Reader#
TxtFileReader 提供了读取本地文件系统数据存储的能力。
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/tmp/data"
],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "date",
"format": "yyyy.MM.dd"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/tmp/result",
"fileName": "txt_",
"writeMode": "truncate",
"format": "yyyy-MM-dd"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
path | 是 | 无 | 本地文件系统的路径信息,注意这里可以支持填写多个路径,详细描述见下文 |
column | 是 | 无 | 读取字段列表,type指定源数据的类型,详见下文 |
fieldDelimiter | 是 | , |
描述:读取的字段分隔符 |
encoding | 否 | utf-8 | 读取文件的编码配置 |
skipHeader | 否 | false | 类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过 |
csvReaderConfig | 否 | 无 | 读取CSV类型文件参数配置,Map类型。不配置则使用默认值,详见下文 |
path#
本地文件系统的路径信息,注意这里可以支持填写多个路径。
当指定单个本地文件,TxtFileReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取
当指定多个本地文件,TxtFileReader支持使用多线程进行数据抽取。线程并发数通过通道数指定
当指定通配符,TxtFileReader尝试遍历出多个文件信息。例如: 指定
/*
代表读取/
目录下所有的文件,指定/bazhen/*
代表读取bazhen
目录下游所有的文件。目前只支持*
作为文件通配符。
特别需要注意的是,Addax会将一个作业下同步的所有Text File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件用户必须保证为类CSV格式,并且提供给Addax权限可读。
特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。
从 3.2.3 版本起, path
下允许混合不同压缩格式的文件,插件会尝试自动猜测压缩格式并自动解压,目前支持的压缩格式有:
zip
bzip2
gzip
LZ4
PACK200
XZ
Compress
column#
读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。
默认情况下,用户可以全部按照String类型读取数据,配置如下:
{
"column": [
"*"
]
}
用户可以指定Column字段信息,配置如下:
[
{
"type": "long",
"index": 0
},
{
"type": "string",
"value": "alibaba"
}
]
对于用户指定Column信息,type必须填写,index/value必须选择其一。
从 4.0.1
开始,表示字段除了使用 index
来指定字段的顺序外,还支持 name
方式,这需要所读取的文件的都包含了文件头,插件会尝试将指定的 name
去匹配从文件读取的文件头,
然后得到对应的 index
值,并回写到 配置文件中。同时,index
和 name
可以在不同的列上进行混合使用,比如下面这样:
[
{
"type": "long",
"index": 0
},
{
"name": "region",
"type": "string"
},
{
"type": "string",
"value": "alibaba"
}
]
注: 这种方式以为这在准备阶段就要尝试读取文件,因为会有一定的性能损失,如非必要,不建议配置 name
方式。
csvReaderConfig#
读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。
常见配置:
{
"csvReaderConfig": {
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}
}
所有配置项及默认值,配置时 csvReaderConfig 的map中请严格按照以下字段名字进行配置:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
类型转换#
Addax 内部类型 | 本地文件 数据类型 |
---|---|
Long | Long |
Double | Double |
String | String |
Boolean | Boolean |
Date | Date |
写入插件#
本章描述 Addax 目前支持的数据写入插件
Cassandra Writer#
CassandraWriter 插件用于向 Cassandra 写入数据。
配置样例#
配置一个从内存产生到Cassandra导入的作业:
{
"job": {
"setting": {
"speed": {
"channel": 5,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "name",
"type": "string"
},
{
"value": "false",
"type": "bool"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": "addr",
"type": "bytes"
},
{
"value": 1.234,
"type": "double"
},
{
"value": 12345678,
"type": "long"
},
{
"value": 2.345,
"type": "double"
},
{
"value": 3456789,
"type": "long"
},
{
"value": "4a0ef8c0-4d97-11d0-db82-ebecdb03ffa5",
"type": "string"
},
{
"value": "value",
"type": "bytes"
},
{
"value": "-838383838,37377373,-383883838,27272772,393993939,-38383883,83883838,-1350403181,817650816,1630642337,251398784,-622020148",
"type": "string"
}
],
"sliceRecordCount": 10000000
}
},
"writer": {
"name": "cassandrawriter",
"parameter": {
"host": "localhost",
"port": 9042,
"useSSL": false,
"keyspace": "stresscql",
"table": "dst",
"batchSize": 10,
"column": [
"name",
"choice",
"date",
"address",
"dbl",
"lval",
"fval",
"ival",
"uid",
"value",
"listval"
]
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
host | 是 | 无 | Cassandra连接点的域名或ip,多个node之间用逗号分隔 |
port | 是 | 9042 | Cassandra端口 |
username | 否 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
useSSL | 否 | false | 是否使用SSL连接 |
connectionsPerHost | 否 | 8 | 客户端连接池配置:与服务器每个节点建多少个连接 |
maxPendingPerConnection | 否 | 128 | 客户端连接池配置:每个连接最大请求数 |
keyspace | 是 | 无 | 需要同步的表所在的keyspace |
table | 是 | 无 | 所选取的需要同步的表 |
column | 是 | 无 | 所配置的表中需要同步的列集合,内容可以是列的名称或 writetime() 。如果将列名配置为 writetime() ,会将这一列的内容作为时间戳 |
consistancyLevel | 否 | LOCAL_QUORUM | 数据一致性级别, 可选 ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY, TWO, THREE, LOCAL_ONE |
batchSize | 否 | 1 | 一次批量提交(UNLOGGED BATCH)的记录数大小(条数) |
类型转换#
Addax 内部类型 | Cassandra 数据类型 |
---|---|
Long | int, tinyint, smallint,varint,bigint,time |
Double | float, double, decimal |
String | ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet |
Date | date, timestamp |
Boolean | bool |
Bytes | blob |
请注意:
目前不支持 counter
类型和 custom
类型。
ClickHouse Writer#
ClickHouseWriter 插件用于了向 ClickHouse 写入数据。
示例#
以下示例我们演示从 clickhouse 中读取一张表的内容,并写入到相同表结构的另外一张表中,用来测试插件所支持的数据结构
表结构以数据#
假定要读取的表结构及数据如下:
CREATE TABLE ck_addax (
c_int8 Int8,
c_int16 Int16,
c_int32 Int32,
c_int64 Int64,
c_uint8 UInt8,
c_uint16 UInt16,
c_uint32 UInt32,
c_uint64 UInt64,
c_float32 Float32,
c_float64 Float64,
c_decimal Decimal(38,10),
c_string String,
c_fixstr FixedString(36),
c_uuid UUID,
c_date Date,
c_datetime DateTime('Asia/Chongqing'),
c_datetime64 DateTime64(3, 'Asia/Chongqing'),
c_enum Enum('hello' = 1, 'world'=2)
) ENGINE = MergeTree() ORDER BY (c_int8, c_int16) SETTINGS index_granularity = 8192;
insert into ck_addax values(
127,
-32768,
2147483647,
-9223372036854775808,
255,
65535,
4294967295,
18446744073709551615,
0.999999999999,
0.99999999999999999,
1234567891234567891234567891.1234567891,
'Hello String',
'2c:16:db:a3:3a:4f',
'5F042A36-5B0C-4F71-ADFD-4DF4FCA1B863',
'2021-01-01',
'2021-01-01 00:00:00',
'2021-01-01 00:00:00',
'hello'
);
要写入的表采取和读取表结构相同,其建表语句如下:
create table ck_addax_writer as ck_addax;
配置#
以下为配置文件
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"writer": {
"name": "clickhousewriter",
"parameter": {
"username": "default",
"column": [
"*"
],
"connection": [
{
"table": [
"ck_addax_writer"
],
"jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/default"
}
],
"preSql": ["alter table @table delete where 1=1"]
}
},
"reader": {
"name": "clickhousereader",
"parameter": {
"username": "default",
"column": [
"*"
],
"connection": [
{
"jdbcUrl": [
"jdbc:clickhouse://127.0.0.1:8123/"
],
"table":["ck_addax"]
}
]
}
}
}
]
}
}
将上述配置文件保存为 job/clickhouse2clickhouse.json
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | ClickHouse JDBC 连接信息 ,可按照官方规范填写连接附件控制信息。具体请参看ClickHouse官方文档 |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
table | 是 | 无 | 所选取的需要同步的表 ,当配置为多张表时,用户自己需保证多张表是同一schema结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合, 使用JSON的数组描述字段信息。用户使用 * 代表默认使用所有列配置,例如 "['*']" |
batchSize | 否 | 2048 | 每次批量数据的条数 |
Dbf Writer#
DbfWriter 提供了向本地文件写入类dbf格式的一个或者多个表文件。
配置样例#
{
"job": {
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "中文测试",
"type": "string"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "dbfwriter",
"parameter": {
"column": [
{
"name": "col1",
"type": "char",
"length": 100
},
{
"name": "col2",
"type": "numeric",
"length": 18,
"scale": 0
},
{
"name": "col3",
"type": "date"
},
{
"name": "col4",
"type": "logical"
},
{
"name": "col5",
"type": "char",
"length": 100
}
],
"fileName": "test.dbf",
"path": "/tmp/out",
"writeMode": "truncate",
"encoding": "GBK"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
path | 是 | 无 | DBF文件目录,注意这里是文件夹,不是文件 |
column | 是 | 类型默认为String | 所配置的表中需要同步的列集合, 是 {type: value} 或 {type: index} 的集合 |
fileName | 是 | 无 | DbfFileWriter写入的文件名 |
writeMode | 是 | 无 | DbfFileWriter写入前数据清理处理模式,支持 truncate , append , nonConflict 三种模式,详见如下 |
encoding | 否 | UTF-8 | DBF文件编码,比如 GBK , UTF-8 |
nullFormat | 否 | \N |
定义哪个字符串可以表示为null, |
dateFormat | 否 | 无 | 日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd" |
writeMode#
写入前数据清理处理模式:
truncate: 写入前清理目录下
fileName
前缀的所有文件。append: 写入前不做任何处理,直接使用
filename
写入,并保证文件名不冲突。nonConflict: 如果目录下有
fileName
前缀的文件,直接报错。
类型转换#
当前该插件支持写入的类型以及对应关系如下:
XBase Type | XBase Symbol | Java Type used in JavaDBF |
---|---|---|
Character | C | java.lang.String |
Numeric | N | java.math.BigDecimal |
Floating Point | F | java.math.BigDecimal |
Logical | L | java.lang.Boolean |
Date | D | java.util.Date |
其中:
numeric 是指本地文件中使用数字类型表示形式,例如
19901219
,整形小数位数为0
。logical 是指本地文件文本中使用Boolean的表示形式,例如
true
、false
。Date 是指本地文件文本中使用Date表示形式,例如
2014-12-31
,Date 是JAVA语言的 Date 类型。
Doris Writer#
DorisWriter 插件用于向 Doris 数据库以流式方式写入数据。
其实现上是通过访问 Doris http 连接(8030),然后通过 stream load
加载数据到数据中,相比 insert into
方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。
Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 MySQLReader 进行访问。
示例#
假定要写入的表的建表语句如下:
CREATE DATABASE example_db;
CREATE TABLE example_db.table1
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");
下面配置一个从内存读取数据,然后写入到 doris 表的配置文件
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"writer": {
"name": "doriswriter",
"parameter": {
"username": "test",
"password": "123456",
"batchSize": 1024,
"connection": [
{
"table": "table1",
"database": "example_db",
"endpoint": "http://127.0.0.1:8030/"
}
]
}
},
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random": "1,500",
"type": "long"
},
{
"random": "1,127",
"type": "long"
},
{
"value": "this is a text",
"type": "string"
},
{
"random": "5,200",
"type": "long"
}
],
"sliceRecordCount": 100
}
}
}
]
}
}
将上述配置文件保存为 job/stream2doris.json
执行下面的命令
bin/addax.sh job/stream2doris.json
输出类似如下:
2021-02-23 15:22:57.851 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-23 15:22:57.871 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"column":[
{
"random":"1,500",
"type":"long"
},
{
"random":"1,127",
"type":"long"
},
{
"type":"string",
"value":"username"
}
],
"sliceRecordCount":100
},
"name":"streamreader"
},
"writer":{
"parameter":{
"password":"*****",
"batchSize":1024,
"connection":[
{
"database":"example_db",
"endpoint":"http://127.0.0.1:8030/",
"table":"table1"
}
],
"username":"test"
},
"name":"doriswriter"
}
}
],
"setting":{
"speed":{
"channel":2
}
}
}
2021-02-23 15:22:57.886 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-23 15:22:57.886 [main] INFO JobContainer - Addax jobContainer starts job.
2021-02-23 15:22:57.920 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-02-23 15:22:57.928 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.
2021-02-23 15:22:57.935 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-02-23 15:22:57.936 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-02-23 15:22:57.970 [0-0-1-writer] INFO DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:22:57.970 [0-0-0-writer] INFO DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:23:00.941 [job-0] INFO JobContainer - PerfTrace not enable!
2021-02-23 15:23:00.946 [job-0] INFO JobContainer -
任务启动时刻 : 2021-02-23 15:22:57
任务结束时刻 : 2021-02-23 15:23:00
任务总计耗时 : 3s
任务平均流量 : 1.56KB/s
记录写入速度 : 66rec/s
读出记录总数 : 200
读写失败总数 : 0
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
endpoint | 是 | string | 无 | Doris 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 | |
username | 是 | string | 无 | HTTP 签名验证帐号 |
password | 否 | string | 无 | HTTP 签名验证密码 |
table | 是 | string | 无 | 所选取的需要同步的表名 |
column | 否 | list | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter | |
batchSize | 否 | int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
column#
该插件中的 column
不是必须项,如果没有配置该项,或者配置为 ["*"]
, 则按照 reader 插件获取的字段值进行顺序拼装。
否则可以按照如下方式指定需要插入的字段
{
"column": ["siteid","citycode","username"]
}
ElasticSearch Writer#
elasticsearchWriter 插件用于向 ElasticSearch 写入数据。其实现是通过 elasticsearch 的rest api接口, 批量把据写入elasticsearch
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random": "10,1000",
"type": "long"
},
{
"value": "1.1.1.1",
"type": "string"
},
{
"value": 19890604.0,
"type": "double"
},
{
"value": 19890604,
"type": "long"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "hello world",
"type": "string"
},
{
"value": "long text",
"type": "string"
},
{
"value": "41.12,-71.34",
"type": "string"
},
{
"value": "2017-05-25 11:22:33",
"type": "string"
}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://localhost:9200",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
},
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word"
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
endpoint | 是 | 无 | ElasticSearch的连接地址 |
accessId | 否 | 空 | http auth中的user, 默认为空 |
accessKey | 否 | 空 | http auth中的password |
index | 是 | 无 | index名 |
type | 否 | index名 | index的type名 |
cleanup | 否 | false | 是否删除原表 |
batchSize | 否 | 1000 | 每次批量数据的条数 |
trySize | 否 | 30 | 失败后重试的次数 |
timeout | 否 | 600000 | 客户端超时时间,单位为毫秒(ms) |
discovery | 否 | false | 启用节点发现将(轮询)并定期更新客户机中的服务器列表 |
compression | 否 | true | 否是开启http请求压缩 |
multiThread | 否 | true | 是否开启多线程http请求 |
ignoreWriteError | 否 | false | 写入错误时,是否重试,如果是 true 则表示一直重试,否则忽略该条数据 |
ignoreParseError | 否 | true | 解析数据格式错误时,是否继续写入 |
alias | 否 | 无 | 数据导入完成后写入别名 |
aliasMode | 否 | append | 数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个) |
settings | 否 | 无 | 创建index时候的settings, 与elasticsearch官方相同 |
splitter | 否 | , |
如果插入数据是array,就使用指定分隔符 |
column | 是 | 无 | 字段类型,文档中给出的样例中包含了全部支持的字段类型 |
dynamic | 否 | false | 不使用addax的mappings,使用es自己的自动mappings |
约束限制#
如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)
Ftp Writer#
FtpWriter 提供了向远程 FTP/SFTP 服务写入文件的能力,当前仅支持写入文本文件。
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": -1
}
},
"content": [
{
"reader": {},
"writer": {
"name": "ftpwriter",
"parameter": {
"protocol": "sftp",
"host": "***",
"port": 22,
"username": "xxx",
"password": "xxx",
"timeout": "60000",
"connectPattern": "PASV",
"path": "/tmp/data/",
"fileName": "test",
"writeMode": "truncate|append|nonConflict",
"fieldDelimiter": ",",
"encoding": "UTF-8",
"nullFormat": "null",
"dateFormat": "yyyy-MM-dd",
"fileFormat": "csv",
"useKey": false,
"keyPath": "",
"keyPass": "",
"header": []
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
protocol | 是 | ftp |
ftp/sftp 服务器协议,目前支持传输协议有ftp和sftp |
host | 是 | 无 | ftp/sftp 服务器地址 |
port | 否 | 22/21 | 若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21 |
timeout | 否 | 60000 |
连接ftp服务器连接超时时间,单位毫秒(ms) |
connectPattern | 否 | PASV |
连接模式,仅支持 PORT , PASV 模式。该参数只在传输协议是标准ftp协议时使用 | |
username | 是 | 无 | ftp/sftp 服务器访问用户名 |
password | 是 | 无 | ftp/sftp 服务器访问密码 |
useKey | 否 | false | 是否使用私钥登录,仅针对 sftp 登录有效 |
keyPath | 否 | ~/.ssh/id_rsa |
私钥地址,如不填写,则使用默认私钥 ~/.ssh/id_rsa |
keyPass | 否 | 无 | 私钥密码,若没有设置私钥密码,则无需配置该项 |
path | 是 | 无 | 远程FTP文件系统的路径信息,FtpWriter会写入Path目录下属多个文件 |
fileName | 是 | 无 | FtpWriter写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名 |
writeMode | 是 | 无 | FtpWriter写入前数据清理处理模式,支持 truncate , append , nonConflict ,详见下文 |
fieldDelimiter | 是 | , |
描述:读取的字段分隔符 |
compress | 否 | 无 | 文本压缩类型,暂不支持 |
encoding | 否 | utf-8 |
读取文件的编码配置 |
dateFormat | 否 | 无 | 日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd" |
fileFormat | 否 | text |
文件写出的格式,包括csv, text两种, |
header | 否 | 无 | text写出时的表头,示例 ['id', 'name', 'age'] |
nullFormat | 否 | \N |
定义哪些字符串可以表示为null |
maxTraversalLevel | 否 | 100 | 允许遍历文件夹的最大层数 |
csvReaderConfig | 否 | 无 | 读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值,详见下文 |
writeMode#
描述:FtpWriter写入前数据清理处理模式:
truncate
,写入前清理目录下一fileName前缀的所有文件。append
,写入前不做任何处理,Addax FtpWriter直接使用filename写入,并保证文件名不冲突。nonConflict
,如果目录下有fileName前缀的文件,直接报错。
认证#
从 4.0.2
版本开始, 支持私钥认证方式登录 SFTP 服务器,如果密码和私有都填写了,则两者认证方式都会尝试。
注意,如果填写了 keyPath
, keyPass
项,但 useKey
设置为 false
,插件依然不会尝试用私钥进行登录。
类型转换#
FTP文件本身不提供数据类型,该类型是 Addax FtpWriter 定义:
Addax 内部类型 | FTP文件 数据类型 |
---|---|
Long | Long -> 字符串序列化表示 |
Double | Double -> 字符串序列化表示 |
String | String -> 字符串序列化表示 |
Boolean | Boolean -> 字符串序列化表示 |
Date | Date -> 字符串序列化表示 |
Greenplum Writer#
GreenplumWriter 插件使用 copy from
语法 将数据写入 Greenplum 数据库。
示例#
以下配置演示从postgresql指定的表读取数据,并插入到具有相同表结构的另外一张表中,用来测试该插件所支持的数据类型。
create table if not exists addax_tbl
(
c_bigint
bigint,
c_bit
bit
(
3
),
c_bool boolean,
c_byte bytea,
c_char char
(
10
),
c_varchar varchar
(
20
),
c_date date,
c_double float8,
c_int integer,
c_json json,
c_number decimal
(
8,
3
),
c_real real,
c_small smallint,
c_text text,
c_ts timestamp,
c_uuid uuid,
c_xml xml,
c_money money,
c_inet inet,
c_cidr cidr,
c_macaddr macaddr
);
insert into addax_tbl
values (999988887777,
B'101',
TRUE,
'\xDEADBEEF',
'hello',
'hello, world',
'2021-01-04',
999888.9972,
9876542,
'{"bar": "baz", "balance": 7.77, "active": false}'::json,
12345.123,
123.123,
126,
'this is a long text ',
'2020-01-04 12:13:14',
'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11'::uuid,
'<foo>bar</foo>'::xml,
'52093.89'::money,
'192.168.1.1'::inet,
'192.168.1/24'::cidr,
'08002b:010203'::macaddr);
创建需要插入的表的语句如下:
create table gp_test like addax_tbl;
任务配置#
以下是配置文件
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "wgzhao",
"password": "wgzhao",
"column": [
"*"
],
"connection": [
{
"table": [
"addax_tbl"
],
"jdbcUrl": [
"jdbc:postgresql://localhost:5432/wgzhao"
]
}
]
}
},
"writer": {
"name": "greenplumwriter",
"parameter": {
"username": "wgzhao",
"password": "wgzhao",
"queueSize": 5,
"numProc": 2,
"numWriter": 1,
"column": [
"*"
],
"preSql": [
"truncate table @table"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://localhost:5432/wgzhao",
"table": [
"gp_test"
]
}
]
}
}
}
]
}
}
将上述配置文件保存为 job/pg2gp.json
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接 附件控制信息 | |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
table | 是 | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter |
preSql | 否 | 无 | 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据,涉及到的表可用 @table 表示 |
postSql | 否 | 无 | 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳 |
queueSize | 否 | 1000 | 线程队列大小,增大此参数增加内存消耗,提升性能 |
numProc | 否 | 4 | 用于进行格式化数据的线程数 |
numWriter | 否 | 1 | 写入数据库的并发数 |
类型转换#
Addax 内部类型 | Greenplum 数据类型 |
---|---|
Long | bigint, bigserial, integer, smallint, serial |
Double | double precision, money, numeric, real |
String | varchar, char, text, bit, inet,cidr,macaddr,uuid,xml,json |
Date | date, time, timestamp |
Boolean | bool |
Bytes | bytea |
Hbase11X Writer#
HbaseWriter 插件实现了从向Hbase中写取数据。在底层实现上,HbaseWriter 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 put 方式写入Hbase。
如果 HBase 是 2.X 版本,则需要使用 HBase20xsqlwriter 插件
配置样例#
配置一个从本地写入hbase1.1.x的作业:
{
"job": {
"setting": {
"speed": {
"channel": 5,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": "/tmp/normal.txt",
"charset": "UTF-8",
"column": [
{
"index": 0,
"type": "String"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "***"
},
"table": "writer",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
}
],
"column": [
{
"index": 1,
"name": "cf1:q1",
"type": "string"
},
{
"index": 2,
"name": "cf1:q2",
"type": "string"
},
{
"index": 3,
"name": "cf1:q3",
"type": "string"
},
{
"index": 4,
"name": "cf2:q1",
"type": "string"
},
{
"index": 5,
"name": "cf2:q2",
"type": "string"
},
{
"index": 6,
"name": "cf2:q3",
"type": "string"
}
],
"versionColumn": {
"index": -1,
"value": "123456789"
},
"encoding": "utf-8"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
hbaseConfig | 是 | 无 | 连接 HBase 集群需要的配置信息,JSON格式, hbase.zookeeper.quorum 为必填项,其他 HBase client的配置为可选项 |
mode | 是 | 无 | 写入 HBase 的模式,目前仅支持 normal 模式 |
table | 是 | 无 | HBase 表名(大小写敏感) |
encoding | 否 | UTF-8 | 编码方式,UTF-8 或是 GBK ,用于对二进制存储的 HBase byte[] 转为 String 时的编码 |
column | 是 | 无 | 要写入的字段,normal 模式与 multiVersionFixedColumn 模式下必填项, 详细说明见下文 |
rowkeyColumn | 是 | 无 | 要写入的 rowkey 列, 详细说明见下文 |
versionColumn | 否 | 无 | 指定写入的时间戳。支持:当前时间、指定时间列,指定时间,三者选一,详见下文 |
nullMode | 否 | skip | 读取的null值时,如何处理, skip 表示不向hbase写这列;empty :写入 HConstants.EMPTY_BYTE_ARRAY ,即new byte [0] |
walFlag | 否 | false | 是否写 WAL , true 表示写入, false 表示不写 |
writeBufferSize | 否 | 8M | 设置写 buffer 大小,单位字节 |
maxVersion | 是 | 无 | 指定在多版本模式下读取的版本数,-1 表示读取所有版本, multiVersionFixedColumn 模式下必填 |
range | 否 | 无 | 指定读取的 rowkey 范围, 详见下文 |
scanCacheSize | 否 | 256 | 每次从服务器端读取的行数 |
scanBatchSize | 否 | 100 | 每次从服务器端读取的列数 |
column#
要写入的hbase字段。index:指定该列对应reader端column的索引,从0开始;name:指定hbase表中的列,必须为 列族:列名 的格式;type:指定写入数据类型,用于转换HBase byte[]。配置格式如下:
{
"column": [
{
"index": 1,
"name": "cf1:q1",
"type": "string"
},
{
"index": 2,
"name": "cf1:q2",
"type": "string"
}
]
}
rowkey#
要写入的 rowkey
列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量。配置格式如下:
{
"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
}
]
}
versionColumn#
指定写入的时间戳。支持:当前时间、指定时间列,指定时间,三者选一。若不配置表示用当前时间。
index:指定对应reader端column的索引,从0开始,需保证能转换为long,若是Date类型, 会尝试用yyyy-MM-dd HH:mm:ss
和yyyy-MM-dd HH:mm:ss SSS
去解析; 若为指定时间index为 -1
;
value:指定时间的值,long值。配置格式如下:
{
"versionColumn": {
"index": 1
}
}
或者
{
"versionColumn": {
"index": -1,
"value": 123456789
}
}
HBase11xsql Writer#
HBase11xsqlwriter 插件利用 Phoniex, 用于向 HBase 1.x 版本的数据库写入数据。
如果你希望通过调用原生接口写入数据,则需要使用HBase11xWriter 插件
如果 HBase 是 2.X 版本,则需要使用 HBase20xsqlwriter 插件
配置样例#
{
"job": {
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": "/tmp/normal.txt",
"charset": "UTF-8",
"column": [
{
"index": 0,
"type": "String"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xsqlwriter",
"parameter": {
"batchSize": "256",
"column": [
"UID",
"TS",
"EVENTID",
"CONTENT"
],
"haveKerberos": "true",
"kerberosPrincipal": "hive@EXAMPLE.COM",
"kerberosKeytabFilePath": "/tmp/hive.headless.keytab",
"hbaseConfig": {
"hbase.zookeeper.quorum": "node1,node2,node3:2181",
"zookeeper.znode.parent": "/hbase-secure"
},
"nullMode": "skip",
"table": "TEST_TBL"
}
}
}
],
"setting": {
"speed": {
"channel": 5,
"bytes": -1
}
}
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
hbaseConfig | 是 | 无 | hbase集群地址,zk为必填项,格式:ip1,ip2,ip3[:port] ,znode 是可选的,默认值是 /hbase |
table | 是 | 无 | 要导入的表名,大小写敏感,通常phoenix表都是 大写 表名 |
column | 是 | 无 | 列名,大小写敏感,通常phoenix的列名都是 大写 ,数据类型无需填写,会自动获取列 |
batchSize | 否 | 256 | 一次写入的最大记录数 |
nullMode | 否 | skip | 读取到的列值为null时,如何处理。支持 skip , empty ,前者表示跳过该列,后者表示插入空值,数值类型为0,字符类型为null |
haveKerberos | 否 | false | 是否启用Kerberos认证, true 表示启用, false 表示不启用 |
kerberosPrincipal | 否 | null | kerberos 凭证信息,仅当 havekerberos 启用后有效 |
kerberosKeytabFilePath | 否 | null | kerberos 凭证文件的绝对路径,仅当 havekerberos 启用后有效 |
注意:启用kerberos认证后,程序需要知道hbase-site.xml
所在的路径,一种办法是运行执行在环境变量 CLASSPATH
中增加该文件的所在路径。
另外一个解决办法是修改 addax.py
中的 CLASS_PATH
变量,增加 hbase-site.xml
的路径
HBase20xsql Writer#
HBase20xsqlwriter 插件利用 Phoenix 向 HBase 2.x 写入数据。
如果 HBase 是 1.X 版本,则可以使用 HBase11xsqlWriter 或HBase11xWriter 插件
配置样例#
{
"job": {
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": "/tmp/normal.txt",
"charset": "UTF-8",
"column": [
{
"index": 0,
"type": "String"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase20xsqlwriter",
"parameter": {
"batchSize": "100",
"column": [
"UID",
"TS",
"EVENTID",
"CONTENT"
],
"queryServerAddress": "http://127.0.0.1:8765",
"nullMode": "skip",
"table": "TEST_TBL"
}
}
}
],
"setting": {
"speed": {
"channel": 5,
"bytes": -1
}
}
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
queryServerAddress | 是 | 无 | Phoenix QueryServer 地址, 该插件通过 PQS 进行连接 |
serialization | 否 | PROTOBUF | QueryServer使用的序列化协议 |
table | 是 | 无 | 所要读取表名 |
schema | 否 | 无 | 表所在的schema |
batchSize | 否 | 256 | 一次批量写入的最大行数 |
column | 否 | 全部列 | 列名,大小写敏感,通常phoenix的列名都是大写, 数据类型无需填写,会自动获取列 |
nullMode | 否 | skip | 读取的null值时,如何处理, skip 表示不向hbase写这列;empty :写入 HConstants.EMPTY_BYTE_ARRAY ,即new byte [0] |
Hdfs Writer#
HdfsWriter 提供向HDFS文件系统指定路径中写入 TEXTFile
, ORCFile
, PARQUET
等格式文件的能力, 文件内容可与 hive 中表关联。
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://xxx:port",
"fileType": "orc",
"path": "/user/hive/warehouse/writerorc.db/orcfull",
"fileName": "xxxx",
"column": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "int"
},
{
"name": "col3",
"type": "string"
},
{
"name": "col4",
"type": "boolean"
},
{
"name": "col5",
"type": "string"
}
],
"writeMode": "overwrite",
"fieldDelimiter": "\u0001",
"compress": "SNAPPY"
}
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 说明 |
---|---|---|---|
path | 是 | 无 | 要读取的文件路径 |
defaultFS | 是 | 无 | Hadoop hdfs 文件系统 NAMENODE 节点地址,如果配置了 HA 模式,则为 defaultFS 的值 |
fileType | 是 | 无 | 文件的类型 |
fileName | 是 | 无 | 要写入的文件名,用于当作前缀 |
column | 是 | 无 | 写入的字段列表 |
writeMode | 是 | 无 | 写入模式,支持 append , overwrite , nonConflict |
fieldDelimiter | 否 | , |
指定文本文件的字段分隔符,二进制文件不需要指定该项 |
encoding | 否 | utf-8 |
文件的编码配置, 目前仅支持 utf-8 |
nullFormat | 否 | 无 | 自定义哪些字符可以表示为空,例如如果用户配置: "\\N" ,那么如果源头数据是 "\N" ,视作 null 字段 |
haveKerberos | 否 | 无 | 是否启用 Kerberos 认证,如果启用,则需要同时配置 kerberosKeytabFilePath ,kerberosPrincipal |
kerberosKeytabFilePath | 否 | 无 | 用于 Kerberos 认证的凭证文件路径, 比如 /your/path/addax.service.keytab |
kerberosPrincipal | 否 | 无 | 用于 Kerberos 认证的凭证主体, 比如 addax/node1@WGZHAO.COM |
compress | 否 | 无 | 文件的压缩格式 |
hadoopConfig | 否 | 无 | 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置 |
path#
存储到 Hadoop hdfs文件系统的路径信息,HdfsWriter 会根据并发配置在 Path
目录下写入多个文件。为与hive表关联,请填写hive表在hdfs上的存储路径。
例:Hive上设置的数据仓库的存储路径为:/user/hive/warehouse/
,已建立数据库:test
,表:hello
;
则对应的存储路径为:/user/hive/warehouse/test.db/hello
(如果建表时指定了location
属性,则依据该属性的路径)
defaultFS#
Hadoop hdfs文件系统 namenode 节点地址。格式:hdfs://ip:port
;例如:hdfs://127.0.0.1:9000
,
如果启用了HA,则为 servicename 模式,比如 hdfs://sandbox
fileType#
描述:文件的类型,目前只支持用户配置为
text 表示 Text file文件格式
orc 表示 OrcFile文件格式
parquet 表示 Parquet 文件格式
rc 表示 Rcfile 文件格式
seq 表示sequence file文件格式
csv 表示普通hdfs文件格式(逻辑二维表)
column#
写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型, 其中:name
指定字段名,type
指定字段类型。
用户可以指定 column
字段信息,配置如下:
{
"column": [
{
"name": "userName",
"type": "string"
},
{
"name": "age",
"type": "long"
},
{
"name": "salary",
"type": "decimal(8,2)"
}
]
}
对于数据类型是 decimal
类型的,需要注意:
如果没有指定精度和小数位,则使用默认的
decimal(38,10)
表示如果仅指定了精度但未指定小数位,则小数位用0表示,即
decimal(p,0)
如果都指定,则使用指定的规格,即
decimal(p,s)
writeMode#
写入前数据清理处理模式:
append,写入前不做任何处理,直接使用
filename
写入,并保证文件名不冲突。overwrite 如果写入目录存在数据,则先删除,后写入
nonConflict,如果目录下有
fileName
前缀的文件,直接报错。
compress#
描述:hdfs文件压缩类型,默认不填写意味着没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)
hadoopConfig#
hadoopConfig
里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置
{
"hadoopConfig": {
"dfs.nameservices": "cluster",
"dfs.ha.namenodes.cluster": "nn1,nn2",
"dfs.namenode.rpc-address.cluster.nn1": "node1.example.com:8020",
"dfs.namenode.rpc-address.cluster.nn2": "node2.example.com:8020",
"dfs.client.failover.proxy.provider.cluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}
这里的 cluster
表示 HDFS 配置成HA时的名字,也是 defaultFS
配置项中的名字 如果实际环境中的名字不是 cluster
,则上述配置中所有写有 cluster
都需要替换
haveKerberos#
是否有Kerberos认证,默认 false
, 如果用户配置true,则配置项 kerberosKeytabFilePath
,kerberosPrincipal
为必填。
kerberosKeytabFilePath#
Kerberos认证 keytab文件路径,绝对路径
kerberosPrincipal#
描述:Kerberos认证Principal名,如 xxxx/hadoopclient@xxx.xxx
类型转换#
Addax 内部类型 | HIVE 数据类型 |
---|---|
Long | TINYINT,SMALLINT,INT,INTEGER,BIGINT |
Double | FLOAT,DOUBLE,DECIMAL |
String | STRING,VARCHAR,CHAR |
Boolean | BOOLEAN |
Date | DATE,TIMESTAMP |
Bytes | BINARY |
功能与限制#
目前不支持:
binary
、arrays
、maps
、structs
、union类型
InfluxDB Writer#
InfluxDBWriter 插件实现了将数据写入 InfluxDB 读取数据的功能。 底层实现上,是通过调用 InfluQL 语言接口,构建插入语句,然后进行数据插入。
示例#
以下示例用来演示该插件从内存读取数据并写入到指定表
创建 job 文件#
创建 job/stream2kudu.json
文件,内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random":"2001-01-01 00:00:00, 2016-07-07 23:59:59",
"type":"date"
},
{
"random": "1,1000",
"type": "long"
},
{
"random": "1,10",
"type": "string"
},
{
"random": "1000,50000",
"type": "double"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "influxdbwriter",
"parameter": {
"connection": [
{
"endpoint": "http://localhost:8086",
"database": "addax",
"table": "addax_tbl"
}
],
"connTimeout": 15,
"readTimeout": 20,
"writeTimeout": 20,
"username": "influx",
"password": "influx123",
"column": [
{"name":"time", "type":"timestamp"},
{"name":"user_id","type":"int"},
{"name":"user_name", "type":"string"},
{"name":"salary", "type":"double"}
],
"preSql": ["delete from addax_tbl"],
"batchSize": 1024,
"retentionPolicy": {"name":"one_day_only", "duration": "1d", "replication":1}
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
endpoint | 是 | string | 无 | InfluxDB 连接串 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
database | 是 | string | 无 | 数据源指定的数据库 |
table | 是 | string | 无 | 要写入的表(指标) |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合 |
connTimeout | 否 | int | 15 | 设置连接超时值,单位为秒 |
readTimeout | 否 | int | 20 | 设置读取超时值,单位为秒 |
writeTimeout | 否 | int | 20 | 设置写入超时值,单位为秒 |
preSql | 否 | list | 无 | 插入数据前执行的SQL语句 |
postSql | 否 | list | 无 | 数据插入完毕后需要执行的语句 |
retentionPolicy | 否 | dict | 无 | 设置数据库的 Retention Policy 策略 |
column#
InfluxDB 作为时序数据库,需要每条记录都有时间戳字段,因此这里会把 column
配置的第一个字段默认当作时间戳
类型转换#
当前支持 InfluxDB 的基本类型
限制#
当前插件仅支持 1.x 版本,2.0 及以上并不支持
Kudu Writer#
KuduWriter 插件实现了将数据写入到 kudu 的能力,当前是通过调用原生RPC接口来实现的。 后期希望通过 impala 接口实现,从而增加更多的功能。
示例#
以下示例演示了如何从内存读取样例数据并写入到 kudu 表中的。
表结构#
我们用 trino 工具连接到 kudu 服务,然后通过下面的 SQL 语句创建表
CREATE TABLE kudu.default.users (
user_id int WITH (primary_key = true),
user_name varchar,
salary double
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 2
);
job 配置文件#
创建 job/stream2kudu.json
文件,内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random": "1,1000",
"type": "long"
},
{
"random": "1,10",
"type": "string"
},
{
"random": "1000,50000",
"type": "double"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "kuduwriter",
"parameter": {
"masterAddress": "127.0.0.1:7051,127.0.0.1:7151,127.0.0.1:7251",
"timeout": 60,
"table": "users",
"writeMode": "upsert",
"column": [
{"name":"user_id","type":"int8"},
{"name":"user_name", "type":"string"},
{"name":"salary", "type":"double"}
],
"batchSize": 1024,
"bufferSize": 2048,
"skipFail": false,
"encoding": "UTF-8"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
masterAddress | 必须 | string | 无 | Kudu Master集群RPC地址,多个地址用逗号(,)分隔 |
table | 必须 | string | 无 | kudu 表名 |
writeMode | 否 | string | upsert | 表数据写入模式,支持 upsert, insert 两者 |
timeout | 否 | int | 60 | 写入数据超时时间(秒) |
column | 是 | list | 无 | 要写入的表字段及类型,如果配置为 "*" ,则会从目标表中读取所有字段 |
skipFail | 否 | boolean | false | 是否跳过插入失败的记录,如果设置为true,则插件不会把插入失败的当作异常 |
已知限制#
暂时不支持
truncate table
MongoDB Writer#
MongoDBWriter 插件用于向 MongoDB 写入数据。
配置样例#
该示例将流式数据写入到 MongoDB 表中
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "unique_id",
"type": "string"
},
{
"value": "sid",
"type": "string"
},
{
"value": "user_id",
"type": "string"
},
{
"value": "auction_id",
"type": "string"
},
{
"value": "content_type",
"type": "string"
},
{
"value": "pool_type",
"type": "string"
},
{
"value": "a1 a2 a3",
"type": "string"
},
{
"value": "c1 c2 c3",
"type": "string"
},
{
"value": "2020-09-06",
"type": "string"
},
{
"value": "tag1 tag2 tag3",
"type": "string"
},
{
"value": "property",
"type": "string"
},
{
"value": 1984,
"type": "long"
},
{
"value": 1900,
"type": "long"
},
{
"value": 75,
"type": "long"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "mongodbwriter",
"parameter": {
"address": [
"127.0.0.1:32768"
],
"userName": "",
"userPassword": "",
"dbName": "tag_per_data",
"collectionName": "tag_data",
"column": [
{
"name": "unique_id",
"type": "string"
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "Array",
"splitter": " "
},
{
"name": "categoryid",
"type": "Array",
"splitter": " "
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "Array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
}
],
"upsertInfo": {
"isUpsert": "true",
"upsertKey": "unique_id"
}
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
address | 是 | 无 | MongoDB的数据地址信息,因为MongoDB可能是个集群,则ip端口信息需要以Json数组的形式给出 |
userName | 否 | 无 | MongoDB的用户名 |
userPassword | 否 | 无 | MongoDB的密码 |
collectionName | 是 | 无 | MongoDB的集合名 |
column | 是 | 无 | MongoDB的文档列名 |
name | 是 | 无 | Column的名字 |
type | 否 | 无 | Column的类型 |
splitter | 否 | 无 | 特殊分隔符,当且仅当要处理的字符串要用分隔符分隔为字符数组时,才使用这个参数,通过这个参数指定的分隔符,将字符串分隔存储到MongoDB的数组中 |
upsertInfo | 否 | 无 | 指定了传输数据时更新的信息 |
isUpsert | 否 | 无 | 当设置为true时,表示针对相同的upsertKey做更新操作 |
upsertKey | 否 | 无 | upsertKey指定了没行记录的业务主键。用来做更新时使用 |
类型转换#
Addax 内部类型 | MongoDB 数据类型 |
---|---|
Long | int, Long |
Double | double |
String | string, array |
Date | date |
Boolean | boolean |
Bytes | bytes |
Mysql Writer#
MysqlWriter 插件实现了写入数据到 Mysql 主库的目的表的功能。
示例#
假定要写入的 MySQL 表建表语句如下:
create table test.addax_tbl
(
col1 varchar(20) ,
col2 int(4),
col3 datetime,
col4 boolean,
col5 binary
) default charset utf8;
这里使用一份从内存产生到 Mysql 导入的数据。
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "",
"column": [
"*"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from @table"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useSSL=false",
"table": [
"addax_tbl"
],
"driver": "com.mysql.jdbc.Driver"
}
]
}
}
}
]
}
}
将上述配置文件保存为 job/stream2mysql.json
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | list | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
driver | 否 | string | 无 | 自定义驱动类名,解决兼容性问题,详见下面描述 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
table | 是 | list | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter |
session | 否 | list | 空 | Addax在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性 |
preSql | 否 | list | 无 | 数据写入钱先执行的sql语句,例如清除旧数据,如果 Sql 中有你需要操作到的表名称,可用 @table 表示 |
postSql | 否 | list | 无 | 数据写入完成后执行的sql语句,例如加上某一个时间戳 |
writeMode | 是 | string | insert | 数据写入表的方式, insert 表示采用 insert into , replace 表示采用replace into 方式 update 表示采用 ON DUPLICATE KEY UPDATE 语句 |
batchSize | 否 | int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
driver#
当前采用的 MySQL JDBC 驱动为 8.0 以上版本,驱动类名使用的 com.mysql.cj.jdbc.Driver
,而不是 com.mysql.jdbc.Driver
。
如果你需要采集的 MySQL 服务低于 5.6
,需要使用到 Connector/J 5.1
驱动,则可以采取下面的步骤:
替换插件内置的驱动
rm -f plugin/writer/mysqlwriter/lib/mysql-connector-java-*.jar
拷贝老的驱动到插件目录
cp mysql-connector-java-5.1.48.jar plugin/writer/mysqlwriter/lib/
指定驱动类名称
在你的 json 文件类,配置 "driver": "com.mysql.jdbc.Driver"
类型转换#
Addax 内部类型 | Mysql 数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint, year |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
bit类型目前是未定义类型转换
Oracle Writer#
OracleWriter 插件实现了写入数据到 Oracle 主库的目的表的功能。
配置样例#
这里使用一份从内存产生到 Oracle 导入的数据。
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",
"table": [
"test"
]
}
]
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接 附件控制信息 |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
writeMode | 否 | insert | 写入方式,支持 insert, update,详见下文 |
table | 是 | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter |
preSql | 否 | 无 | 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据,涉及到的表可用 @table 表示 |
postSql | 否 | 无 | 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳 |
batchSize | 否 | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
session | 否 | 无 | 设置oracle连接时的session信息, 详见下文 |
writeMode#
默认情况下, 采取 insert into
语法写入 Oracle 表,如果你希望采取主键存在时更新,不存在则写入的方式,也就是 Oracle 的 merge into
语法, 可以使用 update
模式。假定表的主键为 id
,则 writeMode
配置方法如下:
"writeMode": "update(id)"
如果是联合唯一索引,则配置方法如下:
"writeMode": "update(col1, col2)"
注: update
模式在 3.1.6
版本首次增加,之前版本并不支持。
session#
描述:设置oracle连接时的session信息,格式示例如下:
{
"session": [
"alter session set nls_date_format = 'dd.mm.yyyy hh24:mi:ss';",
"alter session set NLS_LANG = 'AMERICAN';"
]
}
##类型转换
Addax 内部类型 | Oracle 数据类型 |
---|---|
Long | NUMBER,INTEGER,INT,SMALLINT |
Double | NUMERIC,DECIMAL,FLOAT,DOUBLE PRECISION,REAL |
String | LONG,CHAR,NCHAR,VARCHAR,VARCHAR2,NVARCHAR2,CLOB,NCLOB,CHARACTER,CHARACTER VARYING,CHAR VARYING |
String | NATIONAL CHARACTER,NATIONAL CHAR,NATIONAL CHARACTER VARYING,NATIONAL CHAR VARYING,NCHAR VARYING |
Date | TIMESTAMP,DATE |
Boolean | BIT, BOOL |
Bytes | BLOB,BFILE,RAW,LONG RAW |
Postgresql Writer#
PostgresqlWriter插件实现了写入数据到 PostgreSQL 数据库库表的功能。
示例#
以下配置演示从postgresql指定的表读取数据,并插入到具有相同表结构的另外一张表中,用来测试该插件所支持的数据类型。
表结构信息#
假定建表语句以及输入插入语句如下:
create table if not exists addax_tbl
(
c_bigint bigint,
c_bit bit(3),
c_bool boolean,
c_byte bytea,
c_char char(10),
c_varchar varchar(20),
c_date date,
c_double float8,
c_int integer,
c_json json,
c_number decimal(8,3),
c_real real,
c_small smallint,
c_text text,
c_ts timestamp,
c_uuid uuid,
c_xml xml,
c_money money,
c_inet inet,
c_cidr cidr,
c_macaddr macaddr
);
insert into addax_tbl values(
999988887777,
B'101',
TRUE,
'\xDEADBEEF',
'hello',
'hello, world',
'2021-01-04',
999888.9972,
9876542,
'{"bar": "baz", "balance": 7.77, "active": false}'::json,
12345.123,
123.123,
126,
'this is a long text ',
'2020-01-04 12:13:14',
'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11'::uuid,
'<foo>bar</foo>'::xml,
'52093.89'::money,
'192.168.1.1'::inet,
'192.168.1/24'::cidr,
'08002b:010203'::macaddr
);
创建需要插入的表的语句如下:
create table addax_tbl1 like addax_tbl;
任务配置#
以下是配置文件
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "pgtest",
"password": "pgtest",
"column": [
"*"
],
"connection": [
{
"table": [
"addax_tbl"
],
"jdbcUrl": [
"jdbc:postgresql://localhost:5432/pgtest"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"column": [
"*"
],
"preSql": [
"truncate table @table"
],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/pgtest",
"table": [
"addax_tbl1"
]
}
],
"username": "pgtest",
"password": "pgtest",
"writeMode": "insert"
}
}
}
]
}
}
将上述配置文件保存为 job/pg2pg.json
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接 附件控制信息 |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
writeMode | 否 | insert | 写入模式,支持insert, update 详见如下 |
table | 是 | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter |
preSql | 否 | 无 | 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据,涉及到的表可用 @table 表示 |
postSql | 否 | 无 | 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳 |
batchSize | 否 | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
writeMode#
默认情况下, 采取 insert into
语法写入 postgresql 表,如果你希望采取主键存在时更新,不存在则写入的方式, 可以使用 update
模式。假定表的主键为 id
,则 writeMode
配置方法如下:
"writeMode": "update(id)"
如果是联合唯一索引,则配置方法如下:
"writeMode": "update(col1, col2)"
注: update
模式在 3.1.6
版本首次增加,之前版本并不支持。
类型转换#
目前 PostgresqlWriter支持大部分 PostgreSQL类型,但也存在部分没有支持的情况,请注意检查你的类型。
下面列出 PostgresqlWriter针对 PostgreSQL类型转换列表:
Addax 内部类型 | PostgreSQL 数据类型 |
---|---|
Long | bigint, bigserial, integer, smallint, serial |
Double | double precision, money, numeric, real |
String | varchar, char, text, bit, inet,cidr,macaddr,uuid,xml,json |
Date | date, time, timestamp |
Boolean | bool |
Bytes | bytea |
已知限制#
除以上列出的数据类型外,其他数据类型理论上均为转为字符串类型,但不确保准确性
RDBMS Writer#
RDBMSWriter 插件支持从传统 RDBMS 读取数据。这是一个通用关系数据库读取插件,可以通过注册数据库驱动等方式支持更多关系数据库读取。
同时 RDBMS Writer 又是其他关系型数据库读取插件的的基础类。以下读取插件均依赖该插件
注意, 如果已经提供了专门的数据库写入插件的,推荐使用专用插件,如果你需要写入的数据库没有专门插件,则考虑使用该通用插件。 在使用之前,还需要执行以下操作才可以正常运行,否则运行会出现异常。
配置驱动#
假定你需要写入 IBM DB2 的数据,因为没有提供专门的读取插件,所以我们可以使用该插件来实现,在使用之前,需要执行下面两个操作:
下载对应的 JDBC 驱动,并拷贝到
plugin/writer/rdbmswriter/libs
目录修改
plugin/writer/rdbmswriter/plugin.json
文件,找到drivers
一项,填写正确的 JDBC 驱动名,比如 DB2 的驱动名为com.ibm.db2.jcc.DB2Driver
,类似这样:{ "name": "rdbmswriter", "class": "com.wgzhao.addax.plugin.reader.rdbmswriter.RdbmsWriter", "description": "", "developer": "alibaba", "drivers": ["com.ibm.db2.jcc.DB2Driver"] }
以下列出常见的数据库以及对应的驱动名称
Apache Impala:
com.cloudera.impala.jdbc41.Driver
Enterprise DB:
com.edb.Driver
PrestoDB:
com.facebook.presto.jdbc.PrestoDriver
IBM DB2:
com.ibm.db2.jcc.DB2Driver
MySQL:
com.mysql.cj.jdbc.Driver
Sybase Server:
com.sybase.jdbc3.jdbc.SybDriver
TDengine:
com.taosdata.jdbc.TSDBDriver
达梦数据库:
dm.jdbc.driver.DmDriver
星环Inceptor:
io.transwarp.jdbc.InceptorDriver
TrinoDB:
io.trino.jdbc.TrinoDriver
PrestoSQL:
io.prestosql.jdbc.PrestoDriver
Oracle DB:
oracle.jdbc.OracleDriver
PostgreSQL:
org.postgresql.Drive
配置说明#
配置一个写入RDBMS的作业。
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "rdbmswriter",
"parameter": {
"connection": [
{
"jdbcUrl": "jdbc:dm://ip:port/database",
"driver": "",
"table": [
"table"
]
}
],
"username": "username",
"password": "password",
"column": [
"*"
],
"preSql": [
"delete from XXX;"
]
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | string | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 | |
driver | 否 | string | 无 | 自定义驱动类名,解决兼容性问题,详见下面描述 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
table | 是 | array | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | array | 无 | 所配置的表中需要同步的列名集合,详细描述见后 |
preSql | 否 | array | 无 | 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据,涉及到的表可用 @table 表示 |
postSql | 否 | array | 无 | 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳 |
batchSize | 否 | int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
column#
所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用 *
代表默认使用所有列配置,例如 ["*"]
。
支持列裁剪,即列可以挑选部分列进行导出。
支持列换序,即列可以不按照表schema信息进行导出。
支持常量配置,用户需要按照JSON格式:
["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
id
为普通列名`table`
为包含保留在的列名,1
为整形数字常量,'bazhen.csy'
为字符串常量null
为空指针,注意,这里的null
必须以字符串形式出现,即用双引号引用to_char(a + 1)
为表达式,2.3
为浮点数,true
为布尔值,同样的,这里的布尔值也必须用双引号引用
Column必须显示填写,不允许为空!
jdbcUrl#
jdbcUrl
配置除了配置必要的信息外,我们还可以在增加每种特定驱动的特定配置属性,这里特别提到我们可以利用配置属性对代理的支持从而实现通过代理访问数据库的功能。
比如对于 PrestoSQL 数据库的 JDBC 驱动而言,支持 socksProxy
参数,比如一个可能的 jdbcUrl
为
jdbc:presto://127.0.0.1:8080/hive?socksProxy=192.168.1.101:1081
大部分关系型数据库的 JDBC 驱动支持 socksProxyHost,socksProxyPort
参数来支持代理访问。也有一些特别的情况。
以下是各类数据库 JDBC 驱动所支持的代理类型以及配置方式
数据库 | 代理类型 | 代理配置 | 例子 |
---|---|---|---|
MySQL | socks | socksProxyHost,socksProxyPort | socksProxyHost=192.168.1.101&socksProxyPort=1081 |
Presto | socks | socksProxy | socksProxy=192.168.1.101:1081 |
Presto | http | httpProxy | httpProxy=192.168.1.101:3128 |
driver#
大部分情况下,一个数据库的JDBC驱动是固定的,但有些因为版本的不同,所建议的驱动类名不同,比如 MySQL。
新的 MySQL JDBC 驱动类型推荐使用 com.mysql.cj.jdbc.Driver
而不是以前的 com.mysql.jdbc.Drver
。如果想要使用就的驱动名称,则可以配置 driver
配置项。
Redis Writer#
RedisWrite 提供了还原Redis dump命令的能力,并写入到目标Redis。支持redis cluster集群、proxy、以及单机
配置样例#
{
"job": {
"content": [
{
"reader": {
"name": "redisreader",
"parameter": {
"connection": [
{
"uri": "file:///root/dump.rdb",
"uri": "http://localhost/dump.rdb",
"uri": "tcp://127.0.0.1:7001",
"uri": "tcp://127.0.0.1:7002",
"uri": "tcp://127.0.0.1:7003"
}
]
}
},
"writer": {
"name": "rediswriter",
"parameter": {
"connection": [
{
"uri": "tcp://127.0.0.1:6379",
"auth": "123456"
}
],
"redisCluster": false,
"flushDB": false
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
}
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
uri | 是 | 否 | redis链接,,如果是集群,单机/proxy/redis cluster集群只需填写一个地址即可, 程序会自动获取集群里的所有master |
redisCluster | 否 | false | redis cluster集群请务必填写此项,否者无法定位slot槽。如果是proxy或单机忽略该项 |
flushDB | 否 | false | 迁移前格式化目标Redis |
batchSize | 否 | 1000 | 每次批量处理数量。如果key过大/小,可以相应的调整 |
timeout | 否 | 60000 | 每次执行最大超时时间, 单位毫秒(ms) |
include | 否 | 无 | 要包含的 key, 支持正则表达式 |
exclude | 否 | 无 | 要排除的 key,支持正则表达式 |
SqlServer Writer#
SqlServerWriter 插件实现了写入数据到 SqlServer 库表的功能。
配置样例#
这里使用一份从内存产生到 SqlServer 导入的数据。
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {},
"writer": {
"name": "sqlserverwriter",
"parameter": {
"username": "root",
"password": "root",
"column": [
"db_id",
"db_type",
"db_ip",
"db_port",
"db_role",
"db_name",
"db_username",
"db_password",
"db_modify_time",
"db_modify_user",
"db_description",
"db_tddl_info"
],
"connection": [
{
"table": [
"db_info_for_writer"
],
"jdbcUrl": "jdbc:sqlserver://[HOST_NAME]:PORT;DatabaseName=[DATABASE_NAME]"
}
],
"preSql": [
"delete from @table where db_id = -1;"
],
"postSql": [
"update @table set db_modify_time = now() where db_id = 1;"
]
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
jdbcUrl | 是 | 无 | 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
username | 是 | 无 | 数据源的用户名 |
password | 否 | 无 | 数据源指定用户名的密码 |
table | 是 | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter |
splitPk | 否 | 无 | 使用splitPk代表的字段进行数据分片,详细描述见 rdbmsreader |
preSql | 否 | 无 | 数据写入前先执行的sql语句 |
postSql | 否 | 无 | 数据写入完成后,再执行的SQL语句 |
batchSize | 否 | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM |
##类型转换
类似 SqlServerReader ,目前 SqlServerWriter 支持大部分 SqlServer 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
Stream Writer#
StreamWriter 是一个将数据写入内存的插件,一般用来将获取到的数据写到终端,用来调试读取插件的数据处理情况。
一个典型的 streamwriter 配置如下:
{
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
上述配置会将获取的数据直接打印到终端。 该插件也支持将数据写入到文件,配置如下:
{
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"path": "/tmp/out",
"fileName": "out.txt",
"fieldDelimiter": ",",
"recordNumBeforeSleep": "100",
"sleepTime": "5"
}
}
上述配置中:
fieldDelimiter
表示字段分隔符,默认为制表符(\t
)recordNumBeforeSleep
表示获取多少条记录后,执行休眠,默认为0,表示不启用该功能sleepTime
则表示休眠多长时间,单位为秒,默认为0,表示不启用该功能。
上述配置的含义是将数据写入到 /tmp/out/out.txt
文件, 每获取100条记录后,休眠5秒。
TDengine Writer#
TDengineWriter 插件实现了将数据写入到涛思公司的 TDengine 数据库系统。在底层实现上,TDengineWriter 通过JDBC JNI 驱动连接远程 TDengine 数据库, 并执行相应的sql语句将数据批量写入 TDengine 库中。
前置条件#
考虑到性能问题,该插件使用了 TDengine 的 JDBC-JNI 驱动, 该驱动直接调用客户端 API(libtaos.so
或 taos.dll
)将写入和查询请求发送到 taosd
实例。
因此在使用之前需要配置好动态库链接文件。
首先将 plugin/writer/tdenginewriter/libs/libtaos.so.2.0.16.0
拷贝到 /usr/lib64
目录,然后执行下面的命令创建软链接
ln -sf /usr/lib64/libtaos.so.2.0.16.0 /usr/lib64/libtaos.so.1
ln -sf /usr/lib64/libtaos.so.1 /usr/lib64/libtaos.so
示例#
假定要写入的表如下:
create table test.addax_test (
ts timestamp,
name nchar(100),
file_size int,
file_date timestamp,
flag_open bool,
memo nchar(100)
);
以下是配置文件
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"random":"2017-08-01 00:01:02,2020-01-01 12:13:14",
"type": "date"
},
{
"value": "Addax",
"type": "string"
},
{
"value": 19880808,
"type": "long"
},
{
"value": "1988-08-08 08:08:08",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": ["ts", "name", "file_size", "file_date", "flag_open", "memo" ],
"connection": [
{
"jdbcUrl": "jdbc:TAOS://127.0.0.1:6030/test",
"table": [ "addax_test"]
}
]
}
}
}
]
}
}
将上述配置文件保存为 job/stream2tdengine.json
执行采集命令#
执行以下命令进行数据采集
bin/addax.sh job/tdengine2stream.json
命令输出类似如下:
2021-02-20 15:52:07.691 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-20 15:52:07.748 [main] INFO Engine -
{
"content":[
{
"reader":{
"parameter":{
"column":[
{
"random":"2017-08-01 00:01:02,2020-01-01 12:13:14",
"type":"date"
},
{
"type":"string",
"value":"Addax"
},
{
"type":"long",
"value":19880808
},
{
"type":"date",
"value":"1988-08-08 08:08:08"
},
{
"type":"bool",
"value":true
},
{
"type":"bytes",
"value":"test"
}
],
"sliceRecordCount":1000
},
"name":"streamreader"
},
"writer":{
"parameter":{
"password":"*****",
"column":[
"ts",
"name",
"file_size",
"file_date",
"flag_open",
"memo"
],
"connection":[
{
"jdbcUrl":"jdbc:TAOS://127.0.0.1:6030/test",
"table":[
"addax_test"
]
}
],
"username":"root",
"preSql":[]
},
"name":"tdenginewriter"
}
}
],
"setting":{
"speed":{
"bytes":-1,
"channel":1
}
}
}
2021-02-20 15:52:07.786 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-20 15:52:07.787 [main] INFO JobContainer - Addax jobContainer starts job.
2021-02-20 15:52:07.789 [main] INFO JobContainer - Set jobId = 0
java.library.path:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2021-02-20 15:52:08.048 [job-0] INFO OriginalConfPretreatmentUtil - table:[addax_test] all columns:[ts,name,file_size,file_date,flag_open,memo].
2021-02-20 15:52:08.056 [job-0] INFO OriginalConfPretreatmentUtil - Write data [
INSERT INTO %s (ts,name,file_size,file_date,flag_open,memo) VALUES(?,?,?,?,?,?)
], which jdbcUrl like:[jdbc:TAOS://127.0.0.1:6030/test]
2021-02-20 15:52:11.158 [job-0] INFO JobContainer -
任务启动时刻 : 2021-02-20 15:52:07
任务结束时刻 : 2021-02-20 15:52:11
任务总计耗时 : 3s
任务平均流量 : 11.07KB/s
记录写入速度 : 333rec/s
读出记录总数 : 1000
读写失败总数 : 0
参数说明#
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
jdbcUrl | 是 | list | 无 | 对端数据库的JDBC连接信息,注意,这里的 TAOS 必须大写 |
username | 是 | string | 无 | 数据源的用户名 |
password | 否 | string | 无 | 数据源指定用户名的密码 |
table | 是 | list | 无 | 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构 |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter |
preSql | 否 | list | 无 | 数据写入钱先执行的sql语句,例如清除旧数据,如果 Sql 中有你需要操作到的表名称,可用 @table 表示 |
postSql | 否 | list | 无 | 数据写入完成后执行的sql语句,例如加上某一个时间戳 |
batchSize | 否 | int | 1024 | 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起 |
使用 JDBC-RESTful 接口#
如果不想依赖本地库,或者没有权限,则可以使用 JDBC-RESTful
接口来写入表,相比 JDBC-JNI 而言,配置区别是:
driverClass 指定为
com.taosdata.jdbc.rs.RestfulDriver
jdbcUrl 以
jdbc:TAOS-RS://
开头;使用
6041
作为连接端口
所以上述配置中的connection
应该修改为如下:
{
"connection": [
{
"jdbcUrl": "jdbc:TAOS-RS://127.0.0.1:6041/test",
"table": [
"addax_test"
],
"driver": "com.taosdata.jdbc.rs.RestfulDriver"
}
]
}
类型转换#
目前 TDenginereader 支持 TDengine 所有类型,具体如下
Addax 内部类型 | TDengine 数据类型 |
---|---|
Long | SMALLINT, TINYINT, INT, BIGINT, TIMESTAMP |
Double | FLOAT, DOUBLE |
String | BINARY, NCHAR |
Boolean | BOOL |
当前支持版本#
TDengine 2.0.16
注意事项#
TDengine JDBC-JNI 驱动和动态库版本要求一一匹配,因此如果你的数据版本并不是
2.0.16
,则需要同时替换动态库和插件目录中的JDBC驱动TDengine 的时序字段(timestamp)默认最小值为
1500000000000
,即2017-07-14 10:40:00.0
,如果你写入的时许时间戳小于该值,则会报错
TxtFile Writer#
TxtFileWriter提供了向本地文件写入类CSV格式的一个或者多个表文件。
配置样例#
{
"job": {
"setting": {
"speed": {
"channel": 2,
"bytes": -1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/tmp/data"
],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "boolean"
},
{
"index": 2,
"type": "double"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "date",
"format": "yyyy.MM.dd"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/tmp/result",
"fileName": "luohw",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd"
}
}
}
]
}
}
参数说明#
配置项 | 是否必须 | 默认值 | 描述 |
---|---|---|---|
path | 是 | 无 | 本地文件系统的路径信息,写入Path目录下属多个文件 |
fileName | 是 | 无 | 写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名 |
writeMode | 是 | 无 | FtpWriter写入前数据清理处理模式,支持 truncate , append , nonConflict ,详见下文 |
column | 是 | 默认String类型 | 读取字段列表,type指定源数据的类型,详见下文 |
fieldDelimiter | 是 | , |
描述:读取的字段分隔符 |
compress | 否 | 无 | 文本压缩类型,默认不压缩,支持压缩类型为 zip、lzo、lzop、tgz、bzip2 |
encoding | 否 | utf-8 | 读取文件的编码配置 |
nullFormat | 否 | \N |
定义哪些字符串可以表示为null |
dateFormat | 否 | 无 | 日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd" |
fileFormat | 否 | text | 文件写出的格式,包括csv, text两种, 详见下文 |
header | 否 | 无 | text写出时的表头,示例 ['id', 'name', 'age'] |
writeMode#
写入前数据清理处理模式:
truncate,写入前清理目录下一fileName前缀的所有文件。
append,写入前不做任何处理,直接使用filename写入,并保证文件名不冲突。
nonConflict,如果目录下有fileName前缀的文件,直接报错。
fileFormat#
文件写出的格式,包括 csv 和 text 两种,csv是严格的csv格式,如果待写数据包括列分隔符,则会按照csv的转义语法转义,转义符号为双引号 "
; text格式是用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不做转义。
类型转换#
Addax 内部类型 | 本地文件 数据类型 |
---|---|
Long | Long |
Double | Double |
String | String |
Boolean | Boolean |
Date | Date |
Transformer#
Transformer 定义#
在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。Addax包含了完成的E(Extract)、T(Transformer)、L(Load)支持。
运行模型#
image
UDF 函数#
dx_substr#
dx_substr(idx, pos, length) -> str
参数
idx
: 字段编号,对应record中第几个字段pos
: 字段值的开始位置length
: 目标字段长度
返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
dx_pad#
dx_pad(idx, flag, length, chr)
参数
idx
: 字段编号,对应record中第几个字段flag
: "l","r", 指示是在头进行填充,还是尾进行填充length
: 目标字段长度chr
: 需要填充的字符
返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
举例:
dx_pad(1,"l","4","A")
: 如果column 1
的值为xyz=> Axyz
, 则转换后的值为xyzzzzz => xyzz
dx_pad(1,"r","4","A")
, 如果column 1
的值为xyz=> xyzA
, 值为xyzzzzz => xyzz
dx_replace#
dx_replace(idx, pos, length, str) -> str
参数
idx
: 字段编号,对应record中第几个字段pos
: 字段值的开始位置length
: 需要替换的字段长度str
: 要替换的字符串
返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
举例:
dx_replace(1,"2","4","****")
: 如果column 1
的值为addaxTest
, 则转换为da****est
dx_replace(1,"5","10","****")
如果column 1
的值为addaxTest
则转换为data****
dx_filter#
dx_filter(idx, operator, expr) -> str
参数:
idx
: 字段编号,对应record中第几个字段operator
: 运算符, 支持like
,not like
,>
,=
,<
,>=
,!=
,<=
expr
: 正则表达式(java正则表达式)、值str
: 要替换的字符串
返回:
如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于
>
,=
,<
都是对字段直接compare的结果.like
,not like
是将字段转换成字符类型,然后和目标正则表达式进行全匹配。>
,=
,<
,>=
,!=
,<=
,按照类型进行比较, 数值类型按大小比较,字符及布尔类型按照字典序比较如果目标字段为空(null),对于
= null
的过滤条件,将满足条件,被过滤。!=null
的过滤条件,null不满足过滤条件,不被过滤。like
,字段为null不满足条件,不被过滤,和not like
,字段为null满足条件,被过滤。
举例
dx_filter(1,"like","dataTest")
dx_filter(1,">=","10")
关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。
dx_groovy#
dx_groovy(code, package) -> record
参数
coee
: 符合 groovy 编码要求的代码package
: extraPackage, 列表或者为空
返回
Record 数据类型
注意:
dx_groovy
只能调用一次。不能多次调用。groovy code
中支持java.lang
,java.util
的包,可直接引用的对象有record
,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。 不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。groovy code
中,返回更新过的Record
(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil)
举例:
groovy 实现的 subStr
String code="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String newValue = oriValue.substring(0, 3);\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
dx_groovy(record);
groovy 实现的Replace
String code2="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
groovy 实现的Pad
String code3="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String padString = \"12345\";\n"+
" String finalPad = \"\";\n"+
" int NeedLength = 8 - oriValue.length();\n"+
" while (NeedLength > 0) {\n"+
"\n"+
" if (NeedLength >= padString.length()) {\n"+
" finalPad += padString;\n"+
" NeedLength -= padString.length();\n"+
" } else {\n"+
" finalPad += padString.substring(0, NeedLength);\n"+
" NeedLength = 0;\n"+
" }\n"+
" }\n"+
" String newValue= finalPad + oriValue;\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
Job定义#
本例中,配置4个UDF。
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "My name is xxxx",
"type": "string"
},
{
"value": "password is Passw0rd",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
},
{
"random": "0,10",
"type": "long"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_replace",
"parameter": {
"columnIndex": 0,
"paras": [
"11",
"6",
"wgzhao"
]
}
},
{
"name": "dx_substr",
"parameter": {
"columnIndex": 1,
"paras": [
"0",
"12"
]
}
},
{
"name": "dx_map",
"parameter": {
"columnIndex": 2,
"paras": [
"^",
"2"
]
}
},
{
"name": "dx_filter",
"parameter": {
"columnIndex": 6,
"paras": [
"<",
"5"
]
}
}
]
}
]
}
}
自定义函数#
如果自带的函数不满足数据转换要求,我们可以在 transformer
编写满足 groovy
规范要求的代码,下面给出一个完整的例子
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "Addax",
"type": "string"
},
{
"incr": "1",
"type": "long"
},
{
"incr": "1989/06/04 00:00:01,-1",
"type": "date",
"dateFormat": "yyyy/MM/dd hh:mm:ss"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"column": [
"col1"
],
"encoding": "UTF-8"
}
},
"transformer": [
{
"name": "dx_groovy",
"description": "Add string 'Header_' to the first column value;Double the value of the second field",
"parameter": {
"code": "record.setColumn(0, new StringColumn('Header_' + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;"
}
}
]
}
]
}
}
上述 transformer
代码针对每条记录的前面两个字段做了修改,对第一个字段的字符串,在字符串前面增加 Header_
字符;
第二个整数字段值进行倍增处理。最后执行的结果如下:
$ bin/addax.sh job/transformer_demo.json
___ _ _
/ _ \ | | | |
/ /_\ \ __| | __| | __ ___ __
| _ |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |> <
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version :: (v4.0.2-SNAPSHOT)
2021-08-04 15:45:56.421 [ main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-08-04 15:45:56.443 [ main] INFO Engine -
.....
2021-08-04 15:45:56.458 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-04 15:45:56.459 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-04 15:45:56.460 [ main] INFO JobContainer - Set jobId = 0
2021-08-04 15:45:56.470 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-04 15:45:56.498 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-04 15:45:56.505 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-08-04 15:45:56.520 [ taskGroup-0] INFO TransformerUtil - user config transformers [[dx_groovy]], loading...
2021-08-04 15:45:56.531 [ taskGroup-0] INFO TransformerUtil - 1 of transformer init success. name=dx_groovy, isNative=true parameter =
{"code":"record.setColumn(0, new StringColumn('Header_' + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;"}
Header_Addax 2 1989-06-04 00:00:01 true test
Header_Addax 4 1989-06-03 00:00:01 true test
Header_Addax 6 1989-06-02 00:00:01 true test
Header_Addax 8 1989-06-01 00:00:01 true test
Header_Addax 10 1989-05-31 00:00:01 true test
Header_Addax 12 1989-05-30 00:00:01 true test
Header_Addax 14 1989-05-29 00:00:01 true test
Header_Addax 16 1989-05-28 00:00:01 true test
Header_Addax 18 1989-05-27 00:00:01 true test
Header_Addax 20 1989-05-26 00:00:01 true test
2021-08-04 15:45:59.515 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-04 15:45:59.517 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-08-04 15:45:59.518 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do post work.
2021-08-04 15:45:59.521 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-04 15:45:59.524 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 330 bytes | Speed 110B/s, 3 records/s | Error 0 records, 0 bytes |
All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Transformer Success 10 records | Transformer Error 0 records | Transformer Filter 0 records
| Transformer usedTime 0.383s | Percentage 100.00%
2021-08-04 15:45:59.527 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-04 15:45:56
任务结束时刻 : 2021-08-04 15:45:59
任务总计耗时 : 3s
任务平均流量 : 110B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0
2021-08-04 15:45:59.528 [ job-0] INFO JobContainer -
Transformer成功记录总数 : 10
Transformer失败记录总数 : 0
Transformer过滤记录总数 : 0
计量和脏数据#
Transform过程涉及到数据的转换,可能造成数据的增加或减少,因此更加需要精确度量,包括:
Transform的入参Record条数、字节数。
Transform的出参Record条数、字节数。
Transform的脏数据Record条数、字节数。
如果是多个Transform,某一个发生脏数据,将不会再进行后面的transform,直接统计为脏数据。
目前只提供了所有Transform的计量(成功,失败,过滤的count,以及transform的消耗时间)。
涉及到运行过程的计量数据展现定义如下:
Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。
涉及到最终作业的计量数据展现定义如下:
任务启动时刻 : 2015-03-10 17:34:21
任务结束时刻 : 2015-03-10 17:34:31
任务总计耗时 : 10s
任务平均流量 : 2.10MB/s
记录写入速度 : 100000rec/s
转换输入总数 : 1000000
转换输出总数 : 1000000
读出记录总数 : 1000000
同步失败总数 : 0
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。
任务结果上报服务器功能 说明#
快速介绍#
主要用于将定时任务的结果上报给指定服务器
功能与限制#
支付http协议,JSON格式。
接口地址配置在
core.json
文件下的core.addaxServer.address
下。异步发送。
需要引入
httpclient-4.5.2.jar
,httpcore-4.4.5.jar
,httpcore-nio-4.4.5.jar
,httpasyncclient-4.1.2.jar
相关jar包
功能说明#
配置样例#
{
"jobName": "test",
"startTimeStamp": 1587971621,
"endTimeStamp": 1587971621,
"totalCosts": 10,
"byteSpeedPerSecond": 33,
"recordSpeedPerSecond": 1,
"totalReadRecords": 6,
"totalErrorRecords": 0
}
参数说明#
参数 | 描述 | 必选 | 默认值 |
---|---|---|---|
jobName | 任务名 | 是 | jobName |
startTimeStamp | 任务执行的开始时间 | 是 | 无 |
endTimeStamp | 任务执行的结束时间 | 是 | 无 |
totalCosts | 任务总计耗时 | 是 | 无 |
byteSpeedPerSecond | 任务平均流量 | 是 | 无 |
recordSpeedPerSecond | 记录写入速度 | 是 | 无 |
totalReadRecords | 读出记录总数 | 是 | 0 |
totalErrorRecords | 读写失败总数 | 是 | 0 |
jobName
的设置规则如下:
在命令行通过传递
-P-DjobName=xxxx
方式指定,否则配置文件的
writer.parameters.path
值按/
分割后取第2,3列用点(.)拼接而成,其含义是为库名及表名,否则否则设置为
jobName
Addax 插件开发简明指南#
本指南主要面向那些需要开发符合自己需求的 Addax 插件开发人员。
插件机制#
Addax
为了应对不同数据源的差异、同时提供一致地同步原语和扩展能力,采用了 框架
+ 插件
的模式:
插件只需关心数据的读取或者写入本身。
而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。
作为插件开发人员,则需要关注两个问题:
数据源本身的读写数据正确性。
如何与框架沟通、合理正确地使用框架。
插件视角看框架#
逻辑执行模型#
插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:
Job
:Job
是Addax用以描述从一个源头到一个目的端的同步作业,是Addax数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。Task
:Task
是为最大化而把Job
拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job
,拆分成1024个读Task
,用若干个并发执行。TaskGroup
: 描述的是一组Task
集合。在同一个TaskGroupContainer
执行下的Task
集合称之为TaskGroup
JobContainer
:Job
执行器,负责Job
全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTrackerTaskGroupContainer
:TaskGroup
执行器,负责执行一组Task
的工作单元,类似Yarn中的TaskTracker。
简而言之, Job
拆分成Task
,在分别在框架提供的容器中执行,插件只需要实现 Job
和 Task
两部分逻辑。
物理执行模型#
框架为插件提供物理上的执行能力(线程)。Addax
框架有三种运行模式:
Standalone
: 单进程运行,没有外部依赖。Local
: 单进程运行,统计信息、错误信息汇报到集中存储。Distrubuted
: 分布式多进程运行,依赖Addax Service
服务。
当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。 当 JobContainer
和 TaskGroupContainer
运行在同一个进程内时,就是单机模式(Standalone
和Local
);当它们分布在不同的进程中执行时,就是分布式(Distributed
)模式。
编程接口#
那么,Job
和 Task
的逻辑应是怎么对应到具体的代码中的?
首先,插件的入口类必须扩展 Reader
或 Writer
抽象类,并且实现分别实现 Job
和 Task
两个内部抽象类,Job
和 Task
的实现必须是 内部类 的形式,原因见 加载原理 一节。以Reader为例:
public class SomeReader
extends Reader
{
public static class Job
extends Reader.Job
{
@Override
public void init()
{
}
@Override
public void prepare()
{
}
@Override
public List<Configuration> split(int adviceNumber)
{
return null;
}
@Override
public void post()
{
}
@Override
public void destroy()
{
}
}
public static class Task
extends Reader.Task
{
@Override
public void init()
{
}
@Override
public void prepare()
{
}
@Override
public void startRead(RecordSender recordSender)
{
}
@Override
public void post()
{
}
@Override
public void destroy()
{
}
}
}
Job
接口功能如下:
init
: Job对象初始化工作,测试可以通过super.getPluginJobConf()
获取与本插件相关的配置。读插件获得配置中reader
部分,写插件获得writer
部分。prepare
: 全局准备工作,比如 mysql 清空目标表。split
: 拆分Task
。参数adviceNumber
框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task
的配置列表。post
: 全局的后置工作,比如 mysql writer 同步完影子表后的rename操作。destroy
: Job对象自身的销毁工作。
Task
接口功能如下:
init
:Task对象的初始化。此时可以通过super.getPluginJobConf()
获取与本Task
相关的配置。这里的配置是Job
的split
方法返回的配置列表中的其中一个。prepare
:局部的准备工作。startRead
: 从数据源读数据,写入到RecordSender
中。RecordSender
会把数据写入连接Reader和Writer的缓存队列。startWrite
:从RecordReceiver
中读取数据,写入目标数据源。RecordReceiver
中的数据来自Reader和Writer之间的缓存队列。post
: 局部的后置工作。destroy
: Task象自身的销毁工作。
需要注意的是:
Job
和Task
之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。prepare
和post
在Job
和Task
中都存在,插件需要根据实际情况确定在什么地方执行操作。
框架按照如下的顺序执行 Job
和 Task
的接口:
AddaxReaderWriter
上图中,黄色表示Job
部分的执行阶段,蓝色表示Task
部分的执行阶段,绿色表示框架执行阶段。
相关类关系如下:
Addax
插件定义#
代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?
在每个插件的项目中,都有一个plugin.json
文件,这个文件定义了插件的相关信息,包括入口类。例如:
{
"name": "mysqlwriter",
"class": "com.wgzhao.addax.plugin.writer.mysqlwriter.MysqlWriter",
"description": "Use Jdbc connect to database, execute insert sql.",
"developer": "wgzhao"
}
name
: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。class
: 入口类的全限定名称,框架通过反射穿件入口类的实例。十分重要 。description
: 描述信息。developer
: 开发人员。
打包发布#
Addax
使用 assembly
打包,打包命令如下:
mvn clean package
mvn package assembly:single
Addax
插件需要遵循统一的目录结构:
${ADDAX_HOME}
|-- bin
| `-- addax.py
|-- conf
| |-- core.json
| `-- logback.xml
|-- lib
| `-- addax-core-dependencies.jar
`-- plugin
|-- reader
| `-- mysqlreader
| |-- libs
| | `-- mysql-reader-plugin-dependencies.jar
| |-- mysqlreader-0.0.1-SNAPSHOT.jar
| `-- plugin.json
`-- writer
|-- mysqlwriter
| |-- libs
| | `-- mysql-writer-plugin-dependencies.jar
| |-- mysqlwriter-0.0.1-SNAPSHOT.jar
| `-- plugin.json
|-- oceanbasewriter
`-- odpswriter
${ADDAX_HOME}/bin
: 可执行程序目录。${ADDAX_HOME}/conf
: 框架配置目录。${ADDAX_HOME}/lib
: 框架依赖库目录。${ADDAX_HOME}/plugin
: 插件目录。
插件目录分为reader
和writer
子目录,读写插件分别存放。插件目录规范如下:
${PLUGIN_HOME}/libs
: 插件的依赖库。${PLUGIN_HOME}/plugin-name-version.jar
: 插件本身的jar。${PLUGIN_HOME}/plugin.json
: 插件描述文件。
尽管框架加载插件时,会把 ${PLUGIN_HOME}
下所有的jar放到 classpath
,但还是推荐依赖库的jar和插件本身的jar分开存放。
注意:
插件的目录名字必须和 plugin.json
中定义的插件名称一致。
配置文件#
Addax
使用 json
作为配置文件的格式。一个典型的 Addax
任务配置如下:
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "",
"password": "",
"column": [
"c_datetime",
"c_timestamp",
"c_enum",
"c_set",
"c_varbinary",
"c_longblob",
"c_mediumblob"
],
"connection": [
{
"table": [
"datax_reader"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Chongqing"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "",
"password": "",
"preSql": [
"truncate table @table"
],
"column": [
"c_datetime",
"c_timestamp",
"c_enum",
"c_set",
"c_varbinary",
"c_longblob",
"c_mediumblob"
],
"connection": [
{
"table": [
"tbl_from_mysql"
],
"jdbcUrl": "jdbc:postgresql://localhost:5432/wgzhao"
}
]
}
}
}
]
}
}
Addax
框架有 core.json
配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖 core.json
中的默认值。
配置中job.content.reader.parameter
的value部分会传给Reader.Job
;job.content.writer.parameter
的value部分会传给Writer.Job
,Reader.Job
和Writer.Job
可以通过super.getPluginJobConf()
来获取。
Addax
框架支持对特定的配置项进行RSA加密,例子中以*
开头的项目便是加密后的值。 配置项加密解密过程对插件透明,插件仍然以不带*
的key来查询配置和操作配置项 。
如何设计配置参数#
配置文件的设计是插件开发的第一步!
任务配置中reader
和writer
下parameter
部分是插件的配置参数,插件的配置参数应当遵循以下原则:
驼峰命名:所有配置项采用驼峰命名法,首字母小写,单词首字母大写。
正交原则:配置项必须正交,功能没有重复,没有潜规则。
富类型:合理使用json的类型,减少无谓的处理逻辑,减少出错的可能。
使用正确的数据类型。比如,bool类型的值使用
true
/false
,而非"yes"
/"true"
/0
等。合理使用集合类型,比如,用数组替代有分隔符的字符串。
类似通用:遵守同一类型的插件的习惯,比如关系型数据库的
connection
参数都是如下结构:{ "connection": [ { "table": [ "table_1", "table_2" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/database_1", "jdbc:mysql://127.0.0.2:3306/database_1_slave" ] }, { "table": [ "table_3", "table_4" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.3:3306/database_2", "jdbc:mysql://127.0.0.4:3306/database_2_slave" ] } ] }
如何使用 Configuration
类#
为了简化对json的操作,Addax
提供了简单的DSL配合Configuration
类使用。
Configuration
提供了常见的get
, 带类型get
,带默认值get
,set
等读写配置项的操作,以及clone
, toJSON
等方法。配置项读写操作都需要传入一个path
做为参数,这个path
就是Addax
定义的DSL。语法有两条:
子map用
.key
表示,path
的第一个点省略。数组元素用
[index]
表示。
比如操作如下json:
{
"a": {
"b": {
"c": 2
},
"f": [
1,
2,
{
"g": true,
"h": false
},
4
]
},
"x": 4
}
比如调用configuration.get(path)
方法,当path为如下值的时候得到的结果为:
x
:4
a.b.c
:2
a.b.c.d
:null
a.b.f[0]
:1
a.b.f[2].g
:true
注意,因为插件看到的配置只是整个配置的一部分。使用Configuration
对象时,需要注意当前的根路径是什么。
更多Configuration
的操作请参考ConfigurationTest.java
。
插件数据传输#
跟一般的 生产者-消费者
模式一样,Reader
插件和 Writer
插件之间也是通过 channel
来实现数据的传输的。channel
可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSender
往channel
写入数据,通过RecordReceiver
从channel
读取数据。
channel
中的一条数据为一个 Record
的对象,Record
中可以放多个 Column
对象,这可以简单理解为数据库中的记录和列。
Record
有如下方法:
public interface Record
{
// 加入一个列,放在最后的位置
void addColumn(Column column);
// 在指定下标处放置一个列
void setColumn(int i, final Column column);
// 获取一个列
Column getColumn(int i);
// 转换为json String
String toString();
// 获取总列数
int getColumnNumber();
// 计算整条记录在内存中占用的字节数
int getByteSize();
}
因为Record
是一个接口,Reader
插件首先调用RecordSender.createRecord()
创建一个Record
实例,然后把Column
一个个添加到Record
中。
Writer
插件调用RecordReceiver.getFromReader()
方法获取Record
,然后把Column
遍历出来,写入目标存储中。当Reader
尚未退出,传输还在进行时,如果暂时没有数据RecordReceiver.getFromReader()
方法会阻塞直到有数据。如果传输已经结束,会返回null
,Writer
插件可以据此判断是否结束startWrite
方法。
Column
的构造和操作,我们在《类型转换》一节介绍。
类型转换#
为了规范源端和目的端类型转换操作,保证数据不失真,Addax支持六种内部数据类型:
Long
:定点数(Int、Short、Long、BigInteger等)。Double
:浮点数(Float、Double、BigDecimal(无限精度)等)。String
:字符串类型,底层不限长,使用通用字符集(Unicode)。Date
:日期类型。Bool
:布尔值。Bytes
:二进制,可以存放诸如MP3等非结构化数据。
对应地,有DateColumn
、LongColumn
、DoubleColumn
、BytesColumn
、StringColumn
和BoolColumn
六种Column
的实现。
Column
除了提供数据相关的方法外,还提供一系列以as
开头的数据类型转换转换方法。
Columns
Addax的内部类型在实现上会选用不同的java类型:
内部类型 | 实现类型 | 备注 |
---|---|---|
Date | java.util.Date | |
Long | java.math.BigInteger | 使用无限精度的大整数,保证不失真 |
Double | java.lang.String | 用String表示,保证不失真 |
Bytes | byte[] | |
String | java.lang.String | |
Bool | java.lang.Boolean |
类型之间相互转换的关系如下:
from/to | Date | Long | Double | Bytes | String | Bool |
---|---|---|---|---|---|---|
Date | - | 使用毫秒时间戳 | 不支持 | 不支持 | 使用系统配置的date/time/datetime格式转换 | 不支持 |
Long | 作为毫秒时间戳构造Date | - | BigInteger转为BigDecimal,然后BigDecimal.doubleValue() | 不支持 | BigInteger.toString() | 0为false,否则true |
Double | 不支持 | 内部String构造BigDecimal,然后BigDecimal.longValue() | - | 不支持 | 直接返回内部String | |
Bytes | 不支持 | 不支持 | 不支持 | - | 按照common.column.encoding 配置的编码转换为String,默认utf-8 |
不支持 |
String | 按照配置的date/time/datetime/extra格式解析 | 用String构造BigDecimal,然后取longValue() | 用String构造BigDecimal,然后取doubleValue(),会正确处理NaN /Infinity /-Infinity |
按照common.column.encoding 配置的编码转换为byte[],默认utf-8 |
- | "true"为true , "false"为false ,大小写不敏感。其他字符串不支持 |
Bool | 不支持 | true 为1L ,否则0L |
true 为1.0 ,否则0.0 |
不支持 | - |
脏数据处理#
如何处理脏数据#
在Reader.Task
和Writer.Task
中,功过AbstractTaskPlugin.getPluginCollector()
可以拿到一个TaskPluginCollector
,它提供了一系列collectDirtyRecord
的方法。当脏数据出现时,只需要调用合适的collectDirtyRecord
方法,把被认为是脏数据的Record
传入即可。
用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好。
加载原理#
框架扫描
plugin/reader
和plugin/writer
目录,加载每个插件的plugin.json
文件。以
plugin.json
文件中name
为key,索引所有的插件配置。如果发现重名的插件,框架会异常退出。用户在插件中在
reader
/writer
配置的name
字段指定插件名字。框架根据插件的类型(reader
/writer
)和插件名称去插件的路径下扫描所有的jar,加入classpath
。根据插件配置中定义的入口类,框架通过反射实例化对应的
Job
和Task
对象。