Addax 介绍#

Addax 介绍#

一、 Addax 概览#

Addax 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

images/addax_why_new.pngaddax_why_new

设计理念#

为了解决异构数据源同步问题,Addax将复杂的网状的同步链路变成了星型数据链路,Addax作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到Addax,便能跟已有的数据源做到无缝数据同步。

当前使用现状#

Addax在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了6年之久。目前每天完成同步8w多道作业,每日传输数据量超过300TB。

二、Addax 框架设计#

images/addax-framework_new.pngaddax_framework_new

Addax本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

Addax Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。

四、Addax 核心架构#

Addax 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个Addax作业生命周期的时序图,从整体架构设计非常简要说明Addax各个模块相互关系。

images/addax_arch.pngaddax_arch

核心模块介绍:#

  1. Addax完成单个数据同步的作业,我们称之为Job,Addax接受到一个Job之后,将启动一个进程来完成整个作业同步过程。Addax Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. AddaxJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是Addax作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,Addax Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动 Reader—>Channel—>Writer 的线程来完成任务同步工作。

  5. Addax作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

Addax调度流程:#

举例来说,用户提交了一个Addax作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 Addax的调度决策思路是:

  1. AddaxJob根据分库分表切分成了100个Task。

  2. 根据20个并发,Addax计算共需要分配 20/5 = 4 个TaskGroup。

  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

五、Addax 核心优势#

可靠的数据质量监控#

  • 完美解决数据传输个别类型失真问题

    Addax旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本Addax3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。

  • 提供作业全链路的流量、数据量运行时监控

    Addax3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。

  • 提供脏数据探测

    在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据Addax认为就是脏数据。Addax目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!

丰富的数据转换功能#

Addax作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看Addax3的transformer详细介绍。

精准的速度控制#

新版本Addax3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

"speed": {
"channel": 5,
"byte": 1048576,
"record": 10000
}
强劲的同步性能#

Addax 每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让Addax速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。 另外,Addax团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试。性能测试相关详情可以参照每单个数据源的详细介绍:Addax数据源指南

健壮的容错机制#

Addax作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是Addax的基本要求,在Addax 3.0的设计中,重点完善了框架和插件的稳定性。目前Addax3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。

  • 线程内部重试

    Addax的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

  • 线程级别重试

    目前Addax已经可以实现 TaskFailover,针对于中间失败的Task,Addax框架可以做到整个Task级别的重新调度。

读取插件#

本章描述 Addax 目前支持的数据读取插件

Cassandra Reader#

CassandraReader 插件实现了从Cassandra 读取数据。

配置#

下面是配置一个从 Cassandra 读取数据到终端的例子

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "cassandrareader",
          "parameter": {
            "host": "localhost",
            "port": 9042,
            "useSSL": false,
            "keyspace": "test",
            "table": "addax_src",
            "column": [
              "textCol",
              "blobCol",
              "writetime(blobCol)",
              "boolCol",
              "smallintCol",
              "tinyintCol",
              "intCol",
              "bigintCol",
              "varintCol",
              "floatCol",
              "doubleCol",
              "decimalCol",
              "dateCol",
              "timeCol",
              "timeStampCol",
              "uuidCol",
              "inetCol",
              "durationCol",
              "listCol",
              "mapCol",
              "setCol",
              "tupleCol",
              "udtCol"
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}

参数说明#

parameter 配置项支持以下配置

配置项 是否必须 默认值 描述
host Cassandra连接点的域名或ip,多个node之间用逗号分隔
port 9042 Cassandra端口
username 数据源的用户名
password 数据源指定用户名的密码
useSSL false 是否使用SSL连接
keyspace 需要同步的表所在的keyspace
table 所选取的需要同步的表
column 所配置的表中需要同步的列集合,其中的元素可以指定列的名称或 writetime(column_name),后一种形式会读取column_name列的时间戳而不是数据
where 数据筛选条件的 cql 表达式
allowFiltering 是否在服务端过滤数据,详细描述参考官方文档的相关描述
consistancyLevel LOCAL_QUORUM 数据一致性级别, 可选 ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE
支持的数据类型#

目前支持除 counterCustom 类型之外的所有类型。

下面列出 CassandraReader 针对 Cassandra 类型转换列表:

Addax 内部类型 Cassandra 数据类型
Long int, tinyint, smallint,varint,bigint,time
Double float, double, decimal
String ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet
Date date, timestamp
Boolean bool
Bytes blob

ClickHouse Reader#

ClickHouseReader 插件支持从 ClickHouse数据库读取数据。

示例#

表结构及数据信息#

假定需要的读取的表的结构以及数据如下:

CREATE TABLE ck_addax (
    c_int8 Int8,
    c_int16 Int16,
    c_int32 Int32,
    c_int64 Int64,
    c_uint8 UInt8,
    c_uint16 UInt16,
    c_uint32 UInt32,
    c_uint64 UInt64,
    c_float32 Float32,
    c_float64 Float64,
    c_decimal Decimal(38,10),
    c_string String,
    c_fixstr FixedString(36),
    c_uuid UUID,
    c_date Date,
    c_datetime DateTime('Asia/Chongqing'),
    c_datetime64 DateTime64(3, 'Asia/Chongqing'),
    c_enum Enum('hello' = 1, 'world'=2)
) ENGINE = MergeTree() ORDER BY (c_int8, c_int16) SETTINGS index_granularity = 8192;

insert into ck_addax values(
    127,
    -32768,
    2147483647,
    -9223372036854775808,
    255,
    65535,
    4294967295,
    18446744073709551615,
    0.999999999999,
    0.99999999999999999,
    1234567891234567891234567891.1234567891,
    'Hello String',
    '2c:16:db:a3:3a:4f',
    '5F042A36-5B0C-4F71-ADFD-4DF4FCA1B863',
    '2021-01-01',
    '2021-01-01 00:00:00',
    '2021-01-01 00:00:00',
    'hello'
);

配置 json 文件#

下面的配置文件表示从 ClickHouse 数据库读取指定的表数据并打印到终端

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "clickhousereader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "*"
            ],
            "connection": [
              {
                "table": [
                  "ck_addax"
                ],
                "jdbcUrl": [
                  "jdbc:clickhouse://127.0.0.1:8123/default"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/clickhouse2stream.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/clickhouse2stream.json

其输出信息如下(删除了非关键信息)

2021-01-06 14:39:35.742 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl

2021-01-06 14:39:35.767 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"column":[
						"*"
					],
					"connection":[
						{
							"jdbcUrl":[
								"jdbc:clickhouse://127.0.0.1:8123/"
							],
							"table":[
								"ck_addax"
							]
						}
					],
					"username":"default"
				},
				"name":"clickhousereader"
			},
			"writer":{
				"parameter":{
					"print":true
				},
				"name":"streamwriter"
			}
		}
	],
	"setting":{
		"errorLimit":{
			"record":0,
			"percentage":0.02
		},
		"speed":{
			"channel":3
		}
	}
}

127	-32768	2147483647	-9223372036854775808	255	65535	4294967295	18446744073709551615	1	1	1234567891234567891234567891.1234567891Hello String	2c:16:db:a3:3a:4f	
5f042a36-5b0c-4f71-adfd-4df4fca1b863	2021-01-01	2021-01-01 00:00:00	2021-01-01 00:00:00	hello

任务启动时刻                    : 2021-01-06 14:39:35
任务结束时刻                    : 2021-01-06 14:39:39
任务总计耗时                    :                  3s
任务平均流量                    :               77B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   1
读写失败总数                    :                   0

参数说明#

parameter 配置项支持以下配置

配置项 是否必须 类型 默认值 描述
jdbcUrl array ClickHouse JDBC 连接信息 ,可按照官方规范填写连接附件控制信息。具体请参看ClickHouse官方文档
username string 数据源的用户名
password string 数据源指定用户名的密码
table array 所选取的需要同步的表 ,当配置为多张表时,用户自己需保证多张表是同一schema结构
column array 所配置的表中需要同步的列名集合, 使用JSON的数组描述字段信息。用户使用 * 代表默认使用所有列配置,例如 "['*']"
splitPk string 希望使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能
autoPk bool false 是否自动猜测分片主键,3.2.6 版本引入
where string 筛选条件
querySql array 使用SQL查询而不是直接指定表的方式读取数据,当用户配置querySql时,ClickHouseReader直接忽略table、column、where条件的配置

支持的数据类型#

目前ClickHouseReader支持大部分ClickHouse类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出ClickHouseReader针对ClickHouse类型转换列表:

Addax 内部类型 ClickHouse 数据类型
Long Uint8, Uint16, Uint32, Uint64, Int8, Int16, Int32, Int64, Enum8, Enum16
Double Float32, Float64, Decimal
String String, FixedString(N)
Date Date, DateTime, DateTime64
Boolean UInt8
Bytes String

限制#

除上述罗列字段类型外,其他类型均不支持,如Array、Nested等

DbfFile Reader#

DbfFileReader 插件支持读取DBF格式文件

配置说明#

以下是读取 DBF 文件后打印到终端的配置样例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "dbffilereader",
          "parameter": {
            "column": [
              {
                "index": 0,
                "type": "string"
              },
              {
                "index": 1,
                "type": "long"
              },
              {
                "index": 2,
                "type": "string"
              },
              {
                "index": 3,
                "type": "boolean"
              },
              {
                "index": 4,
                "type": "string"
              },
              {
                "value": "dbf",
                "type": "string"
              }
            ],
            "path": [
              "/tmp/out"
            ],
            "encoding": "GBK"
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": "true"
          }
        }
      }
    ]
  }
}

参数说明#

parameter 配置项支持以下配置

配置项 是否必须 默认值 描述
path DBF文件路径,支持写多个路径,详细情况见下
column 类型默认为String 所配置的表中需要同步的列集合, 是 {type: value}{type: index} 的集合,详细配置见下
encoding GBK DBF文件编码,比如 GBK, UTF-8
nullFormat \N 定义哪个字符串可以表示为null,
path#

描述:本地文件系统的路径信息,注意这里可以支持填写多个路径。

  • 当指定单个本地文件,DbfFileReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取。

  • 当指定多个本地文件,DbfFileReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。

  • 当指定通配符,DbfFileReader尝试遍历出多个文件信息。例如: 指定 /* 代表读取/目录下所有的文件,指定 /bazhen/* 代表读取bazhen目录下游所有的文件。 dbfFileReader目前只支持 * 作为文件通配符。

特别需要注意的是,Addax会将一个作业下同步的所有dbf File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件用户必须保证为类dbf格式,并且提供给Addax权限可读。

特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。

column#

读取字段列表,type指定源数据的类型,name为字段名,长度最大8,value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。

默认情况下,用户可以全部按照String类型读取数据,配置如下:

"column": ["*"]

用户可以指定Column字段信息,配置如下:

{
  "type": "long",
  "index": 0
},
{
"type": "string",
"value": "alibaba"
}

index: 0 表示从本地DBF文件第一列获取int字段 value: alibaba 表示从dbfFileReader内部生成alibaba的字符串字段作为当前字段 对于用户指定Column信息,type必须填写,index/value必须选择其一。

支持的数据类型#

本地文件本身提供数据类型,该类型是Addax dbfFileReader定义:

Addax 内部类型 本地文件 数据类型
Long Long
Double Double
String String
Boolean Boolean
Date Date

其中:

  • Long 是指本地文件文本中使用整形的字符串表示形式,例如"19901219"。

  • Double 是指本地文件文本中使用Double的字符串表示形式,例如"3.1415"。

  • Boolean 是指本地文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。

  • Date 是指本地文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。

ElasticSearchReader#

ElasticSearchReader 插件实现了从 Elasticsearch 读取索引的功能, 它通过 Elasticsearch 提供的 Rest API (默认端口9200),执行指定的查询语句批量获取数据

示例#

假定要获取的索引内容如下

{
"took": 14,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "test-1",
"_type": "default",
"_id": "38",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
},
{
"_index": "test-1",
"_type": "default",
"_id": "103",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
}
]
}
}

配置一个从 Elasticsearch 读取数据并打印到终端的任务

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "elasticsearchreader",
          "parameter": {
            "endpoint": "http://127.0.0.1:9200",
            "accessId":"",
            "accesskey":"",
            "index": "test-1",
            "type": "default",
            "searchType": "dfs_query_then_fetch",
            "headers": {},
            "scroll": "3m",
            "search": [
              {
                "query": {
                  "match": {
                    "col_ip": "1.1.1.1"
                  }
                },
                "aggregations": {
                  "top_10_states": {
                    "terms": {
                      "field": "col_date",
                      "size": 10
                    }
                  }
                }
              }
            ],
            "column": ["col_ip", "col_double", "col_long","col_integer",
              "col_keyword", "col_text","col_geo_point","col_date"
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true,
            "encoding": "UTF-8"
          }
        }
      }
    ]
  }
}

将上述内容保存为 job/es2stream.json

执行下面的命令进行采集

bin/addax.py job/es2stream.json

其输出结果类似如下(输出记录数有删减)

2021-02-19 13:38:15.860 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"accessId":"",
					"headers":{},
					"endpoint":"http://127.0.0.1:9200",
					"search":[
                      {
                        "query": {
                          "match": {
                            "col_ip": "1.1.1.1"
                          }
                        },
                        "aggregations": {
                          "top_10_states": {
                            "terms": {
                              "field": "col_date",
                              "size": 10
                            }
                          }
                        }
                      }
					],
					"accesskey":"*****",
					"searchType":"dfs_query_then_fetch",
					"scroll":"3m",
					"column":[
						"col_ip",
						"col_double",
						"col_long",
						"col_integer",
						"col_keyword",
						"col_text",
						"col_geo_point",
						"col_date"
					],
					"index":"test-1",
					"type":"default"
				},
				"name":"elasticsearchreader"
			},
			"writer":{
				"parameter":{
					"print":true,
					"encoding":"UTF-8"
				},
				"name":"streamwriter"
			}
		}
	],
	"setting":{
		"errorLimit":{
			"record":0,
			"percentage":0.02
		},
		"speed":{
			"byte":-1,
			"channel":1
		}
	}
}

2021-02-19 13:38:15.934 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO  JobContainer - Set jobId = 0

2017-05-25T11:22:33.000+08:00	19890604	hello world	1.1.1.1	long text	19890604	19890604	41.12,-71.34
2017-05-25T11:22:33.000+08:00	19890604	hello world	1.1.1.1	long text	19890604	19890604	41.12,-71.34

2021-02-19 13:38:19.845 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO  JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO  JobContainer - Addax Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO  StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-19 13:38:15
任务结束时刻                    : 2021-02-19 13:38:19
任务总计耗时                    :                  3s
任务平均流量                    :            2.84KB/s
记录写入速度                    :             31rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

参数说明#

配置项 是否必须 类型 默认值 描述
endpoint string ElasticSearch的连接地址
accessId string "" http auth中的user
accessKey string "" http auth中的password
index string elasticsearch中的index名
type string index名 elasticsearch中index的type名
search list [] json格式api搜索数据体
column list 需要读取的字段
timeout int 60 客户端超时时间(单位:秒)
discovery boolean false 启用节点发现将(轮询)并定期更新客户机中的服务器列表
compression boolean true http请求,开启压缩
multiThread boolean true http请求,是否有多线程
searchType string dfs_query_then_fetch 搜索类型
headers map {} http请求头
scroll string "" 滚动分页配置
searchType#

searchType 目前支持以下几种:

  • dfs_query_then_fetch

  • query_then_fetch

  • count

  • scan

Addax FtpReader 说明#

1 快速介绍#

FtpReader提供了读取远程FTP文件系统数据存储的能力。在底层实现上,FtpReader获取远程FTP文件数据,并转换为Addax传输协议传递给Writer。

本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。

2 功能与限制#

FtpReader实现了从远程FTP文件读取数据并转为Addax协议的功能,远程FTP文件本身是无结构化数据存储,对于Addax而言,FtpReader实现上类比TxtFileReader,有诸多相似之处。目前FtpReader支持功能如下:

  1. 支持且仅支持读取TXT的文件,且要求TXT中shema为一张二维表。

  2. 支持类CSV格式文件,自定义分隔符。

  3. 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量

  4. 支持递归读取、支持文件名过滤。

  5. 支持文本压缩,现有压缩格式为zip、gzip、bzip2。

  6. 多个File可以支持并发读取。

我们暂时不能做到:

  1. 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。

  2. 单个File在压缩情况下,从技术上无法支持多线程并发读取。

3 功能说明#

3.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "ftpreader",
          "parameter": {
            "protocol": "sftp",
            "host": "127.0.0.1",
            "port": 22,
            "username": "xx",
            "password": "xxx",
            "path": [
              "/var/pub/ftpReaderTest/data"
            ],
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": 1,
                "type": "boolean"
              },
              {
                "index": 2,
                "type": "double"
              },
              {
                "index": 3,
                "type": "string"
              },
              {
                "index": 4,
                "type": "date",
                "format": "yyyy.MM.dd"
              }
            ],
            "encoding": "UTF-8",
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "ftpWriter",
          "parameter": {
            "path": "/var/ftp/FtpWriter/result",
            "fileName": "shihf",
            "writeMode": "truncate",
            "format": "yyyy-MM-dd"
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
protocol ftp服务器协议,目前支持传输协议有ftp和sftp
host ftp服务器地址
port 22/21 若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21
timeout 60000 连接ftp服务器连接超时时间,单位毫秒(ms)
connectPattern PASV 连接模式,仅支持 PORT, PASV 模式。该参数只在传输协议是标准ftp协议时使用
username ftp服务器访问用户名
password ftp服务器访问密码
path 远程FTP文件系统的路径信息,注意这里可以支持填写多个路径,详细描述见下
column 默认String类型 读取字段列表,type指定源数据的类型,详见下文
fieldDelimiter , 描述:读取的字段分隔符
compress 文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、gzip、bzip2
encoding utf-8 读取文件的编码配置
skipHeader false 类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过
nullFormat \N 定义哪些字符串可以表示为null
maxTraversalLevel 100 允许遍历文件夹的最大层数
csvReaderConfig 读取CSV类型文件参数配置,Map类型。不配置则使用默认值,详见下文
path#

远程FTP文件系统的路径信息,注意这里可以支持填写多个路径。

  • 当指定单个远程FTP文件,FtpReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取 = 当指定多个远程FTP文件,FtpReader支持使用多线程进行数据抽取。线程并发数通过通道数指定

  • 当指定通配符,FtpReader尝试遍历出多个文件信息。例如: 指定 /* 代表读取/目录下所有的文件,指定 /bazhen/* 代表读取 bazhen 目录下游所有的文件。目前只支持 * 作为文件通配符。

特别需要注意的是,Addax会将一个作业下同步的所有Text File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件用户必须保证为类CSV格式,并且提供给Addax权限可读。 特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。

column#

读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。

默认情况下,用户可以全部按照String类型读取数据,配置如下:

    "column": ["*"]

用户可以指定Column字段信息,配置如下:

{
  "type": "long",
  "index": 0
  //从远程FTP文件文本第一列获取int字段
},
{
"type": "string",
"value": "alibaba"  //从FtpReader内部生成alibaba的字符串字段作为当前字段
}

对于用户指定Column信息,type必须填写,index/value必须选择其一。

csvReaderConfig#

常见配置:

"csvReaderConfig":{
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}

所有配置项及默认值,配置时 csvReaderConfig 的map中请严格按照以下字段名字进行配置

boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
3.3 类型转换#

远程FTP文件本身不提供数据类型,该类型是Addax FtpReader定义:

Addax 内部类型 远程FTP文件 数据类型
Long Long
Double Double
String String
Boolean Boolean
Date Date

其中:

  • Long 是指远程FTP文件文本中使用整形的字符串表示形式,例如"19901219"。

  • Double 是指远程FTP文件文本中使用Double的字符串表示形式,例如"3.1415"。

  • Boolean 是指远程FTP文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。

  • Date 是指远程FTP文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。

Hbase11XReader 插件文档#

1 快速介绍#

HbaseReader 插件实现了从 Hbase中读取数据。在底层实现上,HbaseReader 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 Scan 方式读取你指定 rowkey 范围内的数据,并将读取的数据使用 Addax 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。

以下演示基于下面创建的表以及数据

create 'users', 'address','info'
put 'users', 'lisi', 'address:country', 'china'
put 'users', 'lisi', 'address:province',    'beijing'
put 'users', 'lisi', 'info:age',        27
put 'users', 'lisi', 'info:birthday',   '1987-06-17'
put 'users', 'lisi', 'info:company',    'baidu'
put 'users', 'xiaoming', 'address:city',    'hangzhou'
put 'users', 'xiaoming', 'address:country', 'china'
put 'users', 'xiaoming', 'address:province',    'zhejiang'
put 'users', 'xiaoming', 'info:age',        29
put 'users', 'xiaoming', 'info:birthday',   '1987-06-17'
put 'users', 'xiaoming', 'info:company',    'alibaba'
1.1 支持模式#

目前HbaseReader支持两模式读取:normal 模式、multiVersionFixedColumn模式;

normal 模式#

把HBase中的表,当成普通二维表(横表)进行读取,读取最新版本数据。如:

hbase(main):017:0> scan 'users'
ROW           COLUMN+CELL
 lisi         column=address:city, timestamp=1457101972764, value=beijing
 lisi         column=address:country, timestamp=1457102773908, value=china
 lisi         column=address:province, timestamp=1457101972736, value=beijing
 lisi         column=info:age, timestamp=1457101972548, value=27
 lisi         column=info:birthday, timestamp=1457101972604, value=1987-06-17
 lisi         column=info:company, timestamp=1457101972653, value=baidu
 xiaoming     column=address:city, timestamp=1457082196082, value=hangzhou
 xiaoming     column=address:country, timestamp=1457082195729, value=china
 xiaoming     column=address:province, timestamp=1457082195773, value=zhejiang
 xiaoming     column=info:age, timestamp=1457082218735, value=29
 xiaoming     column=info:birthday, timestamp=1457082186830, value=1987-06-17
 xiaoming     column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0580 seconds

读取后数据

rowKey addres:city address:country address:province info:age info:birthday info:company
lisi beijing china beijing 27 1987-06-17 baidu
xiaoming hangzhou china zhejiang 29 1987-06-17 alibaba
multiVersionFixedColumn模式#

把HBase中的表,当成竖表进行读取。读出的每条记录一定是四列形式,依次为:rowKeyfamily:qualifiertimestampvalue

读取时需要明确指定要读取的列,把每一个 cell 中的值,作为一条记录(record),若有多个版本就有多条记录(record)。如:

hbase(main):018:0> scan 'users',{VERSIONS=>5}
ROW                                   COLUMN+CELL
 lisi                                 column=address:city, timestamp=1457101972764, value=beijing
 lisi                                 column=address:contry, timestamp=1457102773908, value=china
 lisi                                 column=address:province, timestamp=1457101972736, value=beijing
 lisi                                 column=info:age, timestamp=1457101972548, value=27
 lisi                                 column=info:birthday, timestamp=1457101972604, value=1987-06-17
 lisi                                 column=info:company, timestamp=1457101972653, value=baidu
 xiaoming                             column=address:city, timestamp=1457082196082, value=hangzhou
 xiaoming                             column=address:contry, timestamp=1457082195729, value=china
 xiaoming                             column=address:province, timestamp=1457082195773, value=zhejiang
 xiaoming                             column=info:age, timestamp=1457082218735, value=29
 xiaoming                             column=info:age, timestamp=1457082178630, value=24
 xiaoming                             column=info:birthday, timestamp=1457082186830, value=1987-06-17
 xiaoming                             column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0260 seconds

读取后数据(4列)

rowKey column:qualifier timestamp value
lisi address:city 1457101972764 beijing
lisi address:contry 1457102773908 china
lisi address:province 1457101972736 beijing
lisi info:age 1457101972548 27
lisi info:birthday 1457101972604 1987-06-17
lisi info:company 1457101972653 beijing
xiaoming address:city 1457082196082 hangzhou
xiaoming address:contry 1457082195729 china
xiaoming address:province 1457082195773 zhejiang
xiaoming info:age 1457082218735 29
xiaoming info:age 1457082178630 24
xiaoming info:birthday 1457082186830 1987-06-17
xiaoming info:company 1457082189826 alibaba
1.2 限制#
  1. 目前不支持动态列的读取。考虑网络传输流量(支持动态列,需要先将hbase所有列的数据读取出来,再按规则进行过滤),现支持的两种读取模式中需要用户明确指定要读取的列。

  2. 关于同步作业的切分:目前的切分方式是根据用户hbase表数据的region分布进行切分。即:在用户填写的 [startrowkey,endrowkey] 范围内,一个region会切分成一个task,单个region不进行切分。

  3. multiVersionFixedColumn模式下不支持增加常量列

2 实现原理#

简而言之,HbaseReader 通过 HBase 的 Java 客户端,通过 HTable, Scan, ResultScanner 等 API,读取你指定 rowkey 范围内的数据,并将读取的数据使用 Addax 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。hbase11xreader与hbase094xreader的主要不同在于API的调用不同,Hbase1.1.x废弃了很多Hbase0.94.x的api。

3 功能说明#

3.1 配置样例#

配置一个从 HBase 抽取数据到本地的作业:(normal 模式)

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hbase11xreader",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "xxxf"
                        },
                        "table": "users",
                        "encoding": "utf-8",
                        "mode": "normal",
                        "column": [
                            {
                                "name": "rowkey",
                                "type": "string"
                            },
                            {
                                "name": "info: age",
                                "type": "string"
                            },
                            {
                                "name": "info: birthday",
                                "type": "date",
                                "format":"yyyy-MM-dd"
                            },
                            {
                                "name": "info: company",
                                "type": "string"
                            },
                            {
                                "name": "address: country",
                                "type": "string"
                            },
                            {
                                "name": "address: province",
                                "type": "string"
                            },
                            {
                                "name": "address: city",
                                "type": "string"
                            }
                        ],
                        "range": {
                            "startRowkey": "",
                            "endRowkey": "",
                            "isBinaryRowkey": true
                        }
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
                        "fileName": "qiran",
                        "writeMode": "truncate"
                    }
                }
            }
        ]
    }
}

配置一个从 HBase 抽取数据到本地的作业:( multiVersionFixedColumn 模式)

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
       "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hbase11xreader",
          "parameter": {
            "hbaseConfig": {
              "hbase.zookeeper.quorum": "127.0.0.1:2181"
            },
            "table": "users",
            "encoding": "utf-8",
            "mode": "multiVersionFixedColumn",
            "maxVersion": "-1",
            "column": [
                {
                    "name": "rowkey",
                    "type": "string"
                },
                {
                    "name": "info: age",
                    "type": "string"
                },
                {
                    "name": "info: birthday",
                    "type": "date",
                    "format":"yyyy-MM-dd"
                },
                {
                    "name": "info: company",
                    "type": "string"
                },
                {
                    "name": "address: contry",
                    "type": "string"
                },
                {
                    "name": "address: province",
                    "type": "string"
                },
                {
                    "name": "address: city",
                    "type": "string"
                }
            ],
            "range": {
              "startRowkey": "",
              "endRowkey": ""
            }
          }
        },
        "writer": {
          "name": "txtfilewriter",
          "parameter": {
            "path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
            "fileName": "qiran",
            "writeMode": "truncate"
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
hbaseConfig 连接HBase集群需要的配置信息,JSON格式, hbase.zookeeper.quorum为必填项,其他 HBase client的配置为可选项
mode 读取hbase的模式,可填写 normalmultiVersionFixedColumn
table 要读取的 hbase 表名(大小写敏感)
encoding UTF-8 编码方式,UTF-8 或是 GBK,用于对二进制存储的 HBase byte[] 转为 String 时的编码
column 要读取的hbase字段,normal 模式与multiVersionFixedColumn 模式下必填项, 详细说明见下文
maxVersion 指定在多版本模式下的hbasereader读取的版本数,取值只能为-1或者大于1的数字,-1表示读取所有版本, multiVersionFixedColumn模式下必填
range 指定hbasereader读取的rowkey范围, 详见下文
scanCacheSize 256 Hbase client每次rpc从服务器端读取的行数
scanBatchSize 100 Hbase client每次rpc从服务器端读取的列数

column

描述:要读取的hbase字段,normal 模式与multiVersionFixedColumn 模式下必填项。

normal 模式

name指定读取的hbase列,除了rowkey外,必须为 列族:列名 的格式,type指定源数据的类型,format指定日期类型的格式,value指定当前类型为常量,不从hbase读取数据,而是根据value值自动生成对应的列。配置格式如下:

"column":
[
    {
        "name": "rowkey",
        "type": "string"
    },
    {
        "value": "test",
        "type": "string"
    }
]

normal 模式下,对于用户指定Column信息,type必须填写,name/value必须选择其一。

multiVersionFixedColumn 模式

name指定读取的hbase列,除了rowkey外,必须为 列族:列名 的格式,type指定源数据的类型,format指定日期类型的格式 。multiVersionFixedColumn模式下不支持常量列。配置格式如下:

"column": [
    {
        "name": "rowkey",
        "type": "string"
    },
    {
        "name": "info: age",
        "type": "string"
    }
]
range#

指定hbasereader读取的rowkey范围

  • startRowkey:指定开始rowkey

  • endRowkey指定结束rowkey

  • isBinaryRowkey:指定配置的startRowkey和endRowkey转换为byte[]时的方式,默认值为false,若为true,则调用Bytes.toBytesBinary(rowkey)方法进行转换;若为false:则调用Bytes.toBytes(rowkey)

配置格式如下:

"range": {
  "startRowkey": "aaa",
  "endRowkey": "ccc",
  "isBinaryRowkey":false
}
3.3 类型转换#

下面列出支持的读取HBase数据类型,HbaseReader 针对 HBase 类型转换列表:

Addax 内部类型 HBase 数据类型
Long int, short ,long
Double float, double
String string,binarystring
Date date
Boolean boolean

请注意:

除上述罗列字段类型外,其他类型均不支持

Hbase11XReader 插件文档#

HbaseReader 插件实现了从 Hbase中读取数据。在底层实现上,HbaseReader 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 Scan 方式读取你指定 rowkey 范围内的数据, 并将读取的数据使用 Addax 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。

配置#

以下演示基于下面创建的表以及数据

create 'users', {NAME=>'address', VERSIONS=>100},{NAME=>'info',VERSIONS=>1000}
put 'users', 'lisi', 'address:country', 'china1', 20200101
put 'users', 'lisi', 'address:province',    'beijing1', 20200101
put 'users', 'lisi', 'info:age',        27, 20200101
put 'users', 'lisi', 'info:birthday',   '1987-06-17', 20200101
put 'users', 'lisi', 'info:company',    'baidu1', 20200101
put 'users', 'xiaoming', 'address:city',    'hangzhou1', 20200101
put 'users', 'xiaoming', 'address:country', 'china1', 20200101
put 'users', 'xiaoming', 'address:province',    'zhejiang1',20200101
put 'users', 'xiaoming', 'info:age',        29, 20200101
put 'users', 'xiaoming', 'info:birthday',   '1987-06-17',20200101
put 'users', 'xiaoming', 'info:company',    'alibaba1', 20200101
put 'users', 'lisi', 'address:country', 'china2', 20200102
put 'users', 'lisi', 'address:province',    'beijing2', 20200102
put 'users', 'lisi', 'info:age',        27, 20200102
put 'users', 'lisi', 'info:birthday',   '1987-06-17', 20200102
put 'users', 'lisi', 'info:company',    'baidu2', 20200102
put 'users', 'xiaoming', 'address:city',    'hangzhou2', 20200102
put 'users', 'xiaoming', 'address:country', 'china2', 20200102
put 'users', 'xiaoming', 'address:province',    'zhejiang2', 20200102
put 'users', 'xiaoming', 'info:age',        29, 20200102
put 'users', 'xiaoming', 'info:birthday',   '1987-06-17', 20200102
put 'users', 'xiaoming', 'info:company',    'alibaba2', 20200102

目前HbaseReader支持两模式读取:normal 模式、multiVersionFixedColumn模式;

normal 模式#

把HBase中的表,当成普通二维表(横表)进行读取,读取最新版本数据。如:

hbase(main):017:0> scan 'users'
ROW           COLUMN+CELL
 lisi         column=address:city, timestamp=1457101972764, value=beijing
 lisi         column=address:country, timestamp=1457102773908, value=china
 lisi         column=address:province, timestamp=1457101972736, value=beijing
 lisi         column=info:age, timestamp=1457101972548, value=27
 lisi         column=info:birthday, timestamp=1457101972604, value=1987-06-17
 lisi         column=info:company, timestamp=1457101972653, value=baidu
 xiaoming     column=address:city, timestamp=1457082196082, value=hangzhou
 xiaoming     column=address:country, timestamp=1457082195729, value=china
 xiaoming     column=address:province, timestamp=1457082195773, value=zhejiang
 xiaoming     column=info:age, timestamp=1457082218735, value=29
 xiaoming     column=info:birthday, timestamp=1457082186830, value=1987-06-17
 xiaoming     column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0580 seconds

读取后数据

rowKey addres:city address:country address:province info:age info:birthday info:company
lisi beijing china beijing 27 1987-06-17 baidu
xiaoming hangzhou china zhejiang 29 1987-06-17 alibaba
multiVersionFixedColumn 模式#

把HBase中的表,当成竖表进行读取。读出的每条记录一定是四列形式,依次为:rowKeyfamily:qualifiertimestampvalue

读取时需要明确指定要读取的列,把每一个 cell 中的值,作为一条记录(record),若有多个版本就有多条记录(record)。如:

hbase(main):018:0> scan 'users',{VERSIONS=>5}
ROW                                   COLUMN+CELL
 lisi                                 column=address:city, timestamp=1457101972764, value=beijing
 lisi                                 column=address:contry, timestamp=1457102773908, value=china
 lisi                                 column=address:province, timestamp=1457101972736, value=beijing
 lisi                                 column=info:age, timestamp=1457101972548, value=27
 lisi                                 column=info:birthday, timestamp=1457101972604, value=1987-06-17
 lisi                                 column=info:company, timestamp=1457101972653, value=baidu
 xiaoming                             column=address:city, timestamp=1457082196082, value=hangzhou
 xiaoming                             column=address:contry, timestamp=1457082195729, value=china
 xiaoming                             column=address:province, timestamp=1457082195773, value=zhejiang
 xiaoming                             column=info:age, timestamp=1457082218735, value=29
 xiaoming                             column=info:age, timestamp=1457082178630, value=24
 xiaoming                             column=info:birthday, timestamp=1457082186830, value=1987-06-17
 xiaoming                             column=info:company, timestamp=1457082189826, value=alibaba
2 row(s) in 0.0260 seconds

读取后数据(4列)

rowKey column:qualifier timestamp value
lisi address:city 1457101972764 beijing
lisi address:contry 1457102773908 china
lisi address:province 1457101972736 beijing
lisi info:age 1457101972548 27
lisi info:birthday 1457101972604 1987-06-17
lisi info:company 1457101972653 beijing
xiaoming address:city 1457082196082 hangzhou
xiaoming address:contry 1457082195729 china
xiaoming address:province 1457082195773 zhejiang
xiaoming info:age 1457082218735 29
xiaoming info:age 1457082178630 24
xiaoming info:birthday 1457082186830 1987-06-17
xiaoming info:company 1457082189826 alibaba

配置一个从 HBase 抽取数据到本地的作业:(normal 模式)

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hbase11xreader",
          "parameter": {
            "hbaseConfig": {
              "hbase.zookeeper.quorum": "xxxf"
            },
            "table": "users",
            "encoding": "utf-8",
            "mode": "normal",
            "column": [
              {
                "name": "rowkey",
                "type": "string"
              },
              {
                "name": "info: age",
                "type": "string"
              },
              {
                "name": "info: birthday",
                "type": "date",
                "format": "yyyy-MM-dd"
              },
              {
                "name": "info: company",
                "type": "string"
              },
              {
                "name": "address: country",
                "type": "string"
              },
              {
                "name": "address: province",
                "type": "string"
              },
              {
                "name": "address: city",
                "type": "string"
              }
            ],
            "range": {
              "startRowkey": "",
              "endRowkey": "",
              "isBinaryRowkey": true
            }
          }
        },
        "writer": {
          "name": "txtfilewriter",
          "parameter": {
            "path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
            "fileName": "qiran",
            "writeMode": "truncate"
          }
        }
      }
    ]
  }
}

配置一个从 HBase 抽取数据到本地的作业:( multiVersionFixedColumn 模式)

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hbase11xreader",
          "parameter": {
            "hbaseConfig": {
              "hbase.zookeeper.quorum": "xxx"
            },
            "table": "users",
            "encoding": "utf-8",
            "mode": "multiVersionFixedColumn",
            "maxVersion": "-1",
            "column": [
              {
                "name": "rowkey",
                "type": "string"
              },
              {
                "name": "info: age",
                "type": "string"
              },
              {
                "name": "info: birthday",
                "type": "date",
                "format": "yyyy-MM-dd"
              },
              {
                "name": "info: company",
                "type": "string"
              },
              {
                "name": "address: contry",
                "type": "string"
              },
              {
                "name": "address: province",
                "type": "string"
              },
              {
                "name": "address: city",
                "type": "string"
              }
            ],
            "range": {
              "startRowkey": "",
              "endRowkey": ""
            }
          }
        },
        "writer": {
          "name": "txtfilewriter",
          "parameter": {
            "path": "/Users/shf/workplace/addax_test/hbase11xreader/result",
            "fileName": "qiran",
            "writeMode": "truncate"
          }
        }
      }
    ]
  }
}

参数说明#

配置项 是否必须 默认值 描述
hbaseConfig 连接HBase集群需要的配置信息,JSON格式, hbase.zookeeper.quorum为必填项,其他 HBase client的配置为可选项
mode 读取hbase的模式,可填写 normalmultiVersionFixedColumn
table 要读取的 hbase 表名(大小写敏感)
encoding UTF-8 编码方式,UTF-8 或是 GBK,用于对二进制存储的 HBase byte[] 转为 String 时的编码
column 要读取的hbase字段,normal 模式与multiVersionFixedColumn 模式下必填项, 详细说明见下文
maxVersion 指定在多版本模式下的hbasereader读取的版本数,取值只能为-1或者大于1的数字,-1表示读取所有版本, multiVersionFixedColumn模式下必填
range 指定hbasereader读取的rowkey范围, 详见下文
scanCacheSize 256 Hbase client每次rpc从服务器端读取的行数
scanBatchSize 100 Hbase client每次rpc从服务器端读取的列数
column#

描述:要读取的hbase字段,normal 模式与multiVersionFixedColumn 模式下必填项。

normal 模式

name指定读取的hbase列,除了rowkey外,必须为 列族:列名 的格式,type指定源数据的类型,format指定日期类型的格式,value指定当前类型为常量,不从hbase读取数据,而是根据value值自动生成对应的列。配置格式如下:

{
  "column": [
    {
      "name": "rowkey",
      "type": "string"
    },
    {
      "value": "test",
      "type": "string"
    }
  ]
}

normal 模式下,对于用户指定Column信息,type必须填写,name/value必须选择其一。

multiVersionFixedColumn 模式

name指定读取的hbase列,除了rowkey外,必须为 列族:列名 的格式,type指定源数据的类型,format指定日期类型的格式 。multiVersionFixedColumn模式下不支持常量列。配置格式如下:

{
  "mode": "multiVersionFixedColumn",
  "maxVersion": 3,
  "column": [
    {
      "name": "rowkey",
      "type": "string"
    },
    {
      "name": "info: age",
      "type": "string"
    }
  ]
}
range#

指定hbasereader读取的rowkey范围

  • startRowkey:指定开始rowkey

  • endRowkey指定结束rowkey

  • isBinaryRowkey:指定配置的startRowkey和endRowkey转换为byte[]时的方式,默认值为false,若为true,则调用Bytes.toBytesBinary(rowkey)方法进行转换;若为false:则调用Bytes.toBytes(rowkey)

配置格式如下:

{
  "range": {
    "startRowkey": "aaa",
    "endRowkey": "ccc",
    "isBinaryRowkey": false
  }
}

类型转换#

下面列出支持的读取HBase数据类型,HbaseReader 针对 HBase 类型转换列表:

Addax 内部类型 HBase 数据类型
Long int, short ,long
Double float, double
String string,binarystring
Date date
Boolean boolean

请注意:

除上述罗列字段类型外,其他类型均不支持

限制#

  1. 目前不支持动态列的读取。考虑网络传输流量(支持动态列,需要先将hbase所有列的数据读取出来,再按规则进行过滤),现支持的两种读取模式中需要用户明确指定要读取的列。

  2. 关于同步作业的切分:目前的切分方式是根据用户hbase表数据的region分布进行切分。即:在用户填写的 [startrowkey,endrowkey] 范围内,一个region会切分成一个task,单个region不进行切分。

  3. multiVersionFixedColumn模式下不支持增加常量列

hbase11xsqlreader 插件文档#

1 快速介绍#

hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。

2 实现原理#

简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用Addax自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

3 功能说明#

3.1 配置样例#

配置一个从Phoenix同步抽取数据到本地的作业:

{
    "job": {
        "setting": {
            "speed": {
                "byte":-1,
              "channel": 1
            }
        },  
        "content": [ {
                "reader": {
                    "name": "hbase11xsqlreader",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.quorum": "node1,node2,node3"
                        },  
                        "table": "US_POPULATION",
                        "column": []
                    }
                },  
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true,
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
hbaseConfig hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181
table 编写Phoenix中的表名,如果有namespace,该值设置为 namespace.tablename
column 填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列
3.3 类型转换#

目前hbase11xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出MysqlReader针对Mysql类型转换列表:

Addax 内部类型 Phoenix 数据类型
String CHAR, VARCHAR
Bytes BINARY, VARBINARY
Bool BOOLEAN
Long INTEGER, TINYINT, SMALLINT, BIGINT
Double FLOAT, DECIMAL, DOUBLE,
Date DATE, TIME, TIMESTAMP

hbase20xsqlreader 插件文档#

1 快速介绍#

hbase20xsqlreader插件实现了从Phoenix(HBase SQL)读取数据,对应版本为HBase2.X和Phoenix5.X。

2 实现原理#

简而言之,hbase20xsqlreader通过Phoenix轻客户端去连接Phoenix QueryServer,并根据用户配置信息生成查询SELECT 语句,然后发送到QueryServer读取HBase数据,并将返回结果使用Addax自定义的数据类型拼装为抽象的数据集,最终传递给下游Writer处理。

3 功能说明#

3.1 配置样例#

配置一个从Phoenix同步抽取数据到本地的作业:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "hbase20xsqlreader",
          "parameter": {
            "queryServerAddress": "http://127.0.0.1:8765",
            "serialization": "PROTOBUF",
            "table": "TEST",
            "column": [
              "ID",
              "NAME"
            ],
            "splitKey": "ID"
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": -1
      }
    }
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
queryServerAddress Phoenix QueryServer 地址, 该插件通过 PQS 进行连接
serialization PROTOBUF QueryServer使用的序列化协议
table 所要读取表名
schema 表所在的schema
column 全部列 填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列
splitKey 根据数据特征动态指定切分点,对表数据按照指定的列的最大、最小值进行切分,仅支持整型和字符串类型
splitPoints 按照表的split进行切分
where 支持对表查询增加过滤条件,每个切分都会携带该过滤条件
querySql 支持指定多个查询语句,但查询列类型和数目必须保持一致
3.3 类型转换#

目前hbase20xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出MysqlReader针对Mysql类型转换列表:

Addax 内部类型 Phoenix 数据类型
String CHAR, VARCHAR
Bytes BINARY, VARBINARY
Bool BOOLEAN
Long INTEGER, TINYINT, SMALLINT, BIGINT
Double FLOAT, DECIMAL, DOUBLE,
Date DATE, TIME, TIMESTAMP

4 约束限制#

  • 切分表时切分列仅支持单个列,且该列必须是表主键

  • 不设置splitPoint默认使用自动切分,此时切分列仅支持整形和字符型

  • 表名和SCHEMA名及列名大小写敏感,请与Phoenix表实际大小写保持一致

  • 仅支持通过Phoenix QeuryServer读取数据,因此您的Phoenix必须启动QueryServer服务才能使用本插件

Addax HdfsReader 插件文档#

1 快速介绍#

HdfsReader提供了读取分布式文件系统数据存储的能力。在底层实现上,HdfsReader获取分布式文件系统上文件的数据,并转换为Addax传输协议传递给Writer。

目前HdfsReader支持的文件格式如下:

  • textfile(text)

  • orcfile(orc)

  • rcfile(rc)

  • sequence file(seq)

  • Csv(csv)

  • parquet

2 功能与限制#

HdfsReader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转为Addax协议的功能。

textfile 是Hive建表时默认使用的存储格式,数据不做压缩,本质上textfile就是以文本的形式将数据存放在hdfs中,对于Addax而言,HdfsReader实现上类比TxtFileReader,有诸多相似之处。

orcfile,它的全名是Optimized Row Columnar file,是对RCFile做了优化。

据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据。HdfsReader利用Hive提供的OrcSerde类,读取解析orcfile文件的数据。目前HdfsReader支持的功能如下:

  1. 支持textfile、orcfile、parquet、rcfile、sequence file和csv格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。

  2. 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量

  3. 支持递归读取、支持正则表达式(*?)。

  4. 支持常见的压缩算法,包括 GZIP, SNAPPY, ZLIB等。

  5. 多个File可以支持并发读取。

  6. 支持sequence file数据压缩,目前支持lzo压缩方式。

  7. csv类型支持压缩格式有:gzip、bz2、zip、lzo、lzo_deflate、snappy。

  8. 目前插件中Hive版本为 3.1.1,Hadoop版本为3.1.1, 在Hadoop 2.7.x, Hadoop 3.1.x 和Hive 2.x, hive 3.1.x 测试环境中写入正常;其它版本理论上都支持,但在生产环境使用前,请进一步测试;

  9. 支持kerberos 认证

3 功能说明#

3.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/user/hive/warehouse/mytable01/*",
            "defaultFS": "hdfs://xxx:port",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": 1,
                "type": "boolean"
              },
              {
                "type": "string",
                "value": "hello"
              },
              {
                "index": 2,
                "type": "double"
              }
            ],
            "fileType": "orc",
            "encoding": "UTF-8",
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}
3.2 配置项说明(各个配置项值前后不允许有空格)#
配置项 是否必须 默认值
path
defaultFS
fileType
column 默认类型为 String
fieldDelimiter ,
encoding utf-8
nullFormat
haveKerberos
kerberosKeytabFilePath
kerberosPrincipal
compress
hadoopConfig
path#

要读取的文件路径,如果要读取多个文件,可以使用正则表达式 *,注意这里可以支持填写多个路径:

  1. 当指定单个Hdfs文件,HdfsReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取。

  2. 当指定多个Hdfs文件,HdfsReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。

  3. 当指定通配符,HdfsReader尝试遍历出多个文件信息。例如: 指定 /* 代表读取 / 目录下所有的文件,指定 /bazhen/* 代表读取 bazhen 目录下游所有的文件。HdfsReader目前只支持 *? 作为文件通配符。

特别需要注意的是,Addax会将一个作业下同步的所有的文件视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。并且提供给Addax权限可读。

defaultFS#

Hadoop hdfs文件系统namenode节点地址,如果 hdfs 配置了 HA 模式,则为 defaultFS 的值

目前HdfsReader已经支持Kerberos认证,如果需要权限认证,则需要用户配置kerberos参数,见下面

fileType#

描述:文件的类型,目前只支持用户配置为

  • text 表示textfile文件格式

  • orc 表示orcfile文件格式

  • rc 表示rcfile文件格式

  • seq 表示sequence file文件格式

  • csv 表示普通hdfs文件格式(逻辑二维表)

  • parquet 表示parquetfile文件格式

特别需要注意的是,HdfsReader能够自动识别文件是orcfile、textfile或者还是其它类型的文件,但该项是必填项,HdfsReader则会只读取用户配置的类型的文件,忽略路径下其他格式的文件

另外需要注意的是,由于textfile和orcfile是两种完全不同的文件格式,所以HdfsReader对这两种文件的解析方式也存在差异,这种差异导致hive支持的复杂复合类型(比如map,array,struct,union)在转换为Addax支持的String类型时,转换的结果格式略有差异,比如以map类型为例:

  • orcfile map类型经hdfsreader解析转换成addax支持的string类型后,结果为 {job=80, team=60, person=70}

  • textfile map类型经hdfsreader解析转换成addax支持的string类型后,结果为 job:80,team:60,person:70

从上面的转换结果可以看出,数据本身没有变化,但是表示的格式略有差异,所以如果用户配置的文件路径中要同步的字段在Hive中是复合类型的话,建议配置统一的文件格式。

如果需要统一复合类型解析出来的格式,我们建议用户在hive客户端将 textfile 格式的表导成orcfile 格式的表

column#

读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。

默认情况下,用户可以全部按照String类型读取数据,配置如下:

"column": ["*"]

用户可以指定Column字段信息,配置如下:

{
  "type": "long",
  "index": 0
  //从本地文件文本第一列获取int字段
},
{
"type": "string",
"value": "alibaba"  //HdfsReader内部生成alibaba的字符串字段作为当前字段
}

对于用户指定Column信息,type必须填写,index/value必须选择其一。

fieldDelimiter#

读取的字段分隔符, HdfsReader在读取textfile数据时,需要指定字段分割符,如果不指定默认为 , ,HdfsReader在读取 orcfile 时,用户无需指定字段分割符

encoding#

读取文件的编码配置

nullFormat#

文本文件中无法使用标准字符串定义null(空指针),Addax提供nullFormat定义哪些字符串可以表示为null。

例如如果用户配置: "\\N" ,那么如果源头数据是 "\N" ,Addax视作 null 字段。

haveKerberos#

是否有Kerberos认证,默认 false, 如果用户配置true,则配置项 kerberosKeytabFilePathkerberosPrincipal 为必填。

kerberosKeytabFilePath#

Kerberos认证 keytab文件路径,绝对路径

kerberosPrincipal#

描述:Kerberos认证Principal名,如 xxxx/hadoopclient@xxx.xxx

compress#

当fileType(文件类型)为csv下的文件压缩方式,目前仅支持 gzip、bz2、zip、lzo、lzo_deflate、hadoop-snappy、framing-snappy压缩;值得注意的是,lzo存在两种压缩格式:lzo和lzo_deflate,用户在配置的时候需要留心,不要配错了;另外,由于snappy目前没有统一的stream format,addax目前只支持最主流的两种:hadoop-snappy(hadoop上的snappy stream format)和 framing-snappy(google建议的snappy stream format);

hadoopConfig#

hadoopConfig 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置

{
  "hadoopConfig": {
    "dfs.nameservices": "cluster",
    "dfs.ha.namenodes.cluster": "nn1,nn2",
    "dfs.namenode.rpc-address.cluster.nn1": "node1.example.com:8020",
    "dfs.namenode.rpc-address.cluster.nn2": "node2.example.com:8020",
    "dfs.client.failover.proxy.provider.cluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  }
}

这里的 cluster 表示 HDFS 配置成HA时的名字,也是 defaultFS 配置项中的名字 如果实际环境中的名字不是 cluster ,则上述配置中所有写有 cluster 都需要替换

csvReaderConfig#

读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。

常见配置:

"csvReaderConfig": {
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}

所有配置项及默认值,配置时 csvReaderConfig 的map中请 严格按照以下字段名字进行配置

boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
3.3 类型转换#

由于textfile和orcfile文件表的元数据信息由Hive维护并存放在Hive自己维护的数据库(如mysql)中,目前HdfsReader不支持对Hive元数

据数据库进行访问查询,因此用户在进行类型转换的时候,必须指定数据类型,如果用户配置的column为 *,则所有column默认转换为

string类型。HdfsReader提供了类型转换的建议表如下:

Addax 内部类型 Hive表 数据类型
Long TINYINT, SMALLINT, INT, BIGINT
Double FLOAT, DOUBLE
String String, CHAR, VARCHAR, STRUCT, MAP, ARRAY, UNION, BINARY
Boolean BOOLEAN
Date Date, TIMESTAMP
Bytes BINARY

其中:

  • Long 是指Hdfs文件文本中使用整形的字符串表示形式,例如 123456789

  • Double 是指Hdfs文件文本中使用Double的字符串表示形式,例如 3.1415

  • Boolean 是指Hdfs文件文本中使用Boolean的字符串表示形式,例如 truefalse。不区分大小写。

  • Date 是指Hdfs文件文本中使用Date的字符串表示形式,例如 2014-12-31

  • Bytes 是指HDFS文件中使用二进制存储的内容,比如一张图片的数据

特别提醒:

  • Hive支持的数据类型 TIMESTAMP 可以精确到纳秒级别,所以 textfile、orcfile 中 TIMESTAMP 存放的数据类似于 2015-08-21 22:40:47.397898389,如果转换的类型配置为Addax的Date,转换之后会导致纳秒部分丢失,所以如果需要保留纳秒部分的数据,请配置转换类型为Addax的String类型。

3.4 按分区读取#

Hive在建表的时候,可以指定分区partition,例如创建分区partition(day="20150820",hour="09"),对应的hdfs文件系统中,相应的表的目录下则会多出/20150820和/09两个目录,且/20150820是/09的父目录。了解了分区都会列成相应的目录结构,在按照某个分区读取某个表所有数据时,则只需配置好json中path的值即可。

比如需要读取表名叫mytable01下分区day为20150820这一天的所有数据,则配置如下:

"path": "/user/hive/warehouse/mytable01/20150820/*"

5 约束限制#

6 FAQ#

  1. 如果报java.io.IOException: Maximum column length of 100,000 exceeded in column...异常信息,说明数据源column字段长度超过了100000字符。

需要在json的reader里增加如下配置

"csvReaderConfig": {
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}

safetySwitch = false; //单列长度不限制100000字符

HttpReader#

HttpReader 插件实现了读取Restful API 数据的能力

示例#

示例接口与数据#

以下配置演示了如何从一个指定的 API 中获取数据,假定访问的接口为:

http://127.0.0.1:9090/mock/17/LDJSC/ASSET

走 GET 请求,请求的参数有

参数名称 参数值示例
CURR_DATE 2021-01-17
DEPT 9400
USERNAME andi

以下是访问的数据样例,(实际返回数据略有不同)

{
  "result": [
    {
      "CURR_DATE": "2019-12-09",
      "DEPT": "9700",
      "TOTAL_MANAGED_MARKET_VALUE": 1581.03,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": 36.75,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": -0.009448781026677719,
      "TMMARKET_VALUE_GROWTH_MON": -0.015153586011995693,
      "TMMARKET_VALUE_GROWTH_YEAR": 0.0652347643813081,
      "TMMARKET_VALUE_SHARECOM": 0.024853621341525287,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.005242133578517903,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1645.1193961136973,
      "YEAR_NEW_ASSET_SSHARECOM": 0.16690149257388515,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": 0.017886267801303465,
      "POTENTIAL_LOST_ASSETS": 56.76,
      "TOTAL_LIABILITIES": 57.81,
      "TOTAL_ASSETS": 1306.33,
      "TOTAL_ASSETS_DOD_GROWTH": 4.79,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": -0.006797058194980485,
      "NEW_ASSETS_DAY": 14.92,
      "NEW_ASSETS_MON": 90.29,
      "NEW_ASSETS_YEAR": 297.32,
      "NEW_ASSETS_DOD_GROWTH_RATE": -0.04015576541561927,
      "NEW_FUNDS_DAY": 18.16,
      "INFLOW_FUNDS_DAY": 2.12,
      "OUTFLOW_FUNDS_DAY": 9.73,
      "OVERALL_POSITION": 0.810298404938773,
      "OVERALL_POSITION_DOD_GROWTH_RATE": -0.03521615634095476,
      "NEW_CUST_FUNDS_MON": 69.44,
      "INFLOW_FUNDS_MONTH": 62.26,
      "OUTFLOW_FUNDS_MONTH": 32.59
    },
    {
      "CURR_DATE": "2019-08-30",
      "DEPT": "8700",
      "TOTAL_MANAGED_MARKET_VALUE": 1596.74,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": 41.86,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": 0.03470208565515685,
      "TMMARKET_VALUE_GROWTH_MON": 0.07818120801111743,
      "TMMARKET_VALUE_GROWTH_YEAR": -0.05440250244736409,
      "TMMARKET_VALUE_SHARECOM": 0.09997733019626448,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.019726478499825697,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1007.9314679742108,
      "YEAR_NEW_ASSET_SSHARECOM": 0.15123738798885086,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": 0.04694052069678048,
      "POTENTIAL_LOST_ASSETS": 52.48,
      "TOTAL_LIABILITIES": 55.28,
      "TOTAL_ASSETS": 1366.72,
      "TOTAL_ASSETS_DOD_GROWTH": 10.12,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": 0.009708491982487952,
      "NEW_ASSETS_DAY": 12.42,
      "NEW_ASSETS_MON": 41.14,
      "NEW_ASSETS_YEAR": 279.32,
      "NEW_ASSETS_DOD_GROWTH_RATE": -0.025878627161898062,
      "NEW_FUNDS_DAY": 3.65,
      "INFLOW_FUNDS_DAY": 14.15,
      "OUTFLOW_FUNDS_DAY": 17.08,
      "OVERALL_POSITION": 0.9098432997243932,
      "OVERALL_POSITION_DOD_GROWTH_RATE": 0.02111922282868306,
      "NEW_CUST_FUNDS_MON": 57.21,
      "INFLOW_FUNDS_MONTH": 61.16,
      "OUTFLOW_FUNDS_MONTH": 15.83
    },
    {
      "CURR_DATE": "2019-06-30",
      "DEPT": "6501",
      "TOTAL_MANAGED_MARKET_VALUE": 1506.72,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": -13.23,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": -0.0024973354204176554,
      "TMMARKET_VALUE_GROWTH_MON": -0.015530793150701896,
      "TMMARKET_VALUE_GROWTH_YEAR": -0.08556724628979398,
      "TMMARKET_VALUE_SHARECOM": 0.15000077963967678,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.049629446804825755,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1250.1040863177336,
      "YEAR_NEW_ASSET_SSHARECOM": 0.19098445630488178,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": -0.007881179708853471,
      "POTENTIAL_LOST_ASSETS": 50.53,
      "TOTAL_LIABILITIES": 56.62,
      "TOTAL_ASSETS": 1499.53,
      "TOTAL_ASSETS_DOD_GROWTH": 29.56,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": -0.02599813232345556,
      "NEW_ASSETS_DAY": 28.81,
      "NEW_ASSETS_MON": 123.24,
      "NEW_ASSETS_YEAR": 263.63,
      "NEW_ASSETS_DOD_GROWTH_RATE": 0.0073986669331394875,
      "NEW_FUNDS_DAY": 18.52,
      "INFLOW_FUNDS_DAY": 3.26,
      "OUTFLOW_FUNDS_DAY": 6.92,
      "OVERALL_POSITION": 0.8713692113306709,
      "OVERALL_POSITION_DOD_GROWTH_RATE": 0.02977644553289545,
      "NEW_CUST_FUNDS_MON": 85.14,
      "INFLOW_FUNDS_MONTH": 23.35,
      "OUTFLOW_FUNDS_MONTH": 92.95
    },
    {
      "CURR_DATE": "2019-12-07",
      "DEPT": "8705",
      "TOTAL_MANAGED_MARKET_VALUE": 1575.85,
      "TOTAL_MANAGED_MARKET_VALUE_GROWTH": 8.94,
      "TMMARKET_VALUE_DOD_GROWTH_RATE": -0.04384846980627058,
      "TMMARKET_VALUE_GROWTH_MON": -0.022962456288549656,
      "TMMARKET_VALUE_GROWTH_YEAR": -0.005047009316021089,
      "TMMARKET_VALUE_SHARECOM": 0.07819484815809447,
      "TMMARKET_VALUE_SHARE_GROWTH_RATE": -0.008534369960890256,
      "AVERAGE_NEW_ASSETS_DAYINMON": 1340.0339240689955,
      "YEAR_NEW_ASSET_SSHARECOM": 0.19019952857677042,
      "YN_ASSET_SSHARECOM_GROWTH_RATE": 0.01272353909992914,
      "POTENTIAL_LOST_ASSETS": 54.63,
      "TOTAL_LIABILITIES": 53.17,
      "TOTAL_ASSETS": 1315.08,
      "TOTAL_ASSETS_DOD_GROWTH": 49.31,
      "TOTAL_ASSETS_DOD_GROWTH_RATE": 0.0016538407028265922,
      "NEW_ASSETS_DAY": 29.17,
      "NEW_ASSETS_MON": 44.75,
      "NEW_ASSETS_YEAR": 172.87,
      "NEW_ASSETS_DOD_GROWTH_RATE": 0.045388692595736746,
      "NEW_FUNDS_DAY": 18.46,
      "INFLOW_FUNDS_DAY": 12.93,
      "OUTFLOW_FUNDS_DAY": 10.38,
      "OVERALL_POSITION": 0.8083127036694828,
      "OVERALL_POSITION_DOD_GROWTH_RATE": -0.02847453515632541,
      "NEW_CUST_FUNDS_MON": 49.74,
      "INFLOW_FUNDS_MONTH": 81.93,
      "OUTFLOW_FUNDS_MONTH": 18.17
    }
  ]
}

我们需要把 result 结果中的部分 key 值数据获取

配置#

以下配置实现从接口获取数据并打印到终端

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "httpreader",
          "parameter": {
            "connection": [
              {
                "url": "http://127.0.0.1:9090/mock/17/LDJSC/ASSET",
                "proxy": {
                    "host": "http://127.0.0.1:3128",
                    "auth": "user:pass"
                }
              }
            ],
            "reqParams": {
              "CURR_DATE":"2021-01-18",
              "DEPT":"9700"
            },
            "resultKey":"result",
            "method": "GET",
            "column": ["CURR_DATE","DEPT","TOTAL_MANAGED_MARKET_VALUE","TOTAL_MANAGED_MARKET_VALUE_GROWTH"],
            "username": "user",
            "password": "passw0rd",
            "headers": {
                "X-Powered-by": "Addax"
            }
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": "true"
          }
        }
      }
    ]
  }
}

将上述内容保存为 job/httpreader2stream.json 文件。

执行#

执行以下命令,进行采集

bin/addax.py job/httpreader2stream.json

上述命令的输出结果大致如下:

2021-01-20 09:07:41.864 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-01-20 09:07:41.877 [main] INFO  Engine - the machine info  =>

	osInfo: 	Mac OS X x86_64 10.15.1
	jvmInfo:	AdoptOpenJDK 14 14.0.2+12
	cpu num:	8

	totalPhysicalMemory:	-0.00G
	freePhysicalMemory:	-0.00G
	maxFileDescriptorCount:	-1
	currentOpenFileDescriptorCount:	-1

	GC Names	[G1 Young Generation, G1 Old Generation]

	MEMORY_NAME                    | allocation_size                | init_size
	CodeHeap 'profiled nmethods'   | 117.21MB                       | 2.44MB
	G1 Old Gen                     | 2,048.00MB                     | 39.00MB
	G1 Survivor Space              | -0.00MB                        | 0.00MB
	CodeHeap 'non-profiled nmethods' | 117.21MB                       | 2.44MB
	Compressed Class Space         | 1,024.00MB                     | 0.00MB
	Metaspace                      | -0.00MB                        | 0.00MB
	G1 Eden Space                  | -0.00MB                        | 25.00MB
	CodeHeap 'non-nmethods'        | 5.57MB                         | 2.44MB


2021-01-20 09:07:41.903 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"reqParams":{
						"CURR_DATE":"2021-01-18",
						"DEPT":"9700"
					},
					"method":"GET",
					"column":[
						"CURR_DATE",
						"DEPT",
						"TOTAL_MANAGED_MARKET_VALUE",
						"TOTAL_MANAGED_MARKET_VALUE_GROWTH"
					],
					"resultKey":"result",
					"connection":[
						{
							"url":"http://127.0.0.1:9090/mock/17/LDJSC/ASSET"
						}
					]
				},
				"name":"httpreader"
			},
			"writer":{
				"parameter":{
					"print":"true"
				},
				"name":"streamwriter"
			}
		}
	],
	"setting":{
		"speed":{
			"bytes":-1,
			"channel":1
		}
	}
}

2021-01-20 09:07:41.926 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-01-20 09:07:41.927 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-01-20 09:07:41.928 [main] INFO  JobContainer - Set jobId = 0
2021-01-20 09:07:42.002 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started

2019-08-30	9700	1539.85	-14.78
2019-10-01	9700	1531.71	47.66
2020-12-03	9700	1574.38	7.34
2020-11-31	9700	1528.13	41.62
2019-03-01	9700	1554.28	-9.29

2021-01-20 09:07:45.006 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-01-20 09:07:41
任务结束时刻                    : 2021-01-20 09:07:44
任务总计耗时                    :                  3s
任务平均流量                    :               42B/s
记录写入速度                    :              1rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0
参数说明#
配置项 是否必须 数据类型 默认值 说明
url string 要访问的HTTP地址
reqParams map 接口请求参数
resultKey string 要获取结果的那个key值,如果是获取整个返回值,则可以不用填写
method string get 请求模式,仅支持GET,POST两种,不区分大小写
column list 要获取的key,如果配置为 "*" ,则表示获取所有key的值
username string 接口请求需要的认证帐号(如有)
password string 接口请求需要的密码(如有)
proxy map 代理地址,详见下面描述
headers map 定制的请求头信息
proxy#

如果访问的接口需要通过代理,则可以配置 proxy 配置项,该配置项是一个 json 字典,包含一个必选的 host 字段和一个可选的 auth 字段。

{
  "proxy": {
    "host": "http://127.0.0.1:8080",
    "auth": "user:pass"
  }
}

如果是 sock 代理 (V4,v5),则可以写

{
  "proxy": {
    "host": "socks://127.0.0.1:8080",
    "auth": "user:pass"
  }
}

host 是代理地址,包含代理类型,目前仅支持 http 代理和 socks(V4, V5均可) 代理。 如果代理需要认证,则可以配置 auth , 它由 用户名和密码组成,两者之间用冒号(:) 隔开。

限制说明#
  1. 返回的结果必须是JSON类型

  2. 当前所有key的值均当作字符串类型

  3. 暂不支持接口Token鉴权模式

  4. 暂不支持分页获取

  5. 代理仅支持 http 模式

InfluxDBReader#

InfluxDBReader 插件实现了从 InfluxDB 读取数据。底层实现上,是通过调用 InfluQL 语言查询表,然后获得返回数据。

示例#

以下示例用来演示该插件如何从指定表(即指标)上读取数据并输出到终端

创建需要的库表和数据#

通过以下命令来创建需要读取的表以及数据

# create database
influx --execute "CREATE DATABASE NOAA_water_database"
# download sample data
curl https://s3.amazonaws.com/noaa.water-database/NOAA_data.txt -o NOAA_data.txt
# import data via influx-cli
influx -import -path=NOAA_data.txt -precision=s -database=NOAA_water_database
创建 job 文件#

创建 job/influxdb2stream.json 文件,内容如下:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "influxdbreader",
          "parameter": {
            "column": [
              "*"
            ],
            "connection": [
              {
                "endpoint": "http://localhost:8086",
                "database": "NOAA_water_database",
                "table": "h2o_feet",
                "where": "1=1"
              }
            ],
            "connTimeout": 15,
            "readTimeout": 20,
            "writeTimeout": 20,
            "username": "influx",
            "password": "influx123"
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": "true"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "bytes": -1,
        "channel": 1
      }
    }
  }
}
运行#

执行下面的命令进行数据采集

bin/addax.py job/influxdb2stream.json

参数说明#

配置项 是否必须 数据类型 默认值 描述
endpoint string
username string 数据源的用户名
password string 数据源指定用户名的密码
database string 数据源指定的数据库
table string 所选取的需要同步的表名
column list 所配置的表中需要同步的列名集合,详细描述rdbmreader
connTimeout int 15 设置连接超时值,单位为秒
readTimeout int 20 设置读取超时值,单位为秒
writeTimeout int 20 设置写入超时值,单位为秒
where 针对表的筛选条件
querySql 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 tablecolumn这些配置项

类型转换#

当前实现是将所有字段当作字符串处理

限制#

  1. 当前插件仅支持 1.x 版本,2.0 及以上并不支持

JsonFileReader 插件文档#

1 快速介绍#

JsonFileReader 提供了读取本地文件系统数据存储的能力。在底层实现上,JsonFileReader获取本地文件数据,使用Jayway JsonPath抽取Json字符串,并转换为Addax传输协议传递给Writer。

2 功能与限制#

JsonFileReader实现了从本地文件读取数据并转为Addax协议的功能,本地文件是可以是Json数据格式的集合,对于Addax而言,JsonFileReader实现上类比TxtFileReader,有诸多相似之处。目前JsonFileReader支持功能如下:

  1. 支持且仅支持读取TXT的文件,且要求TXT中s内容必须符合json

  2. 支持列常量和Json的Key为空值

  3. 支持递归读取、支持文件名过滤

  4. 多个File可以支持并发读取

我们暂时不能做到:

  1. 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。

  2. 单个File在压缩情况下,从技术上无法支持多线程并发读取。

  3. 暂不支持读取压缩文件和日期类型的自定义日期

3 功能说明#

3.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": "true"
          }
        },
        "reader": {
          "name": "jsonfilereader",
          "parameter": {
            "path": [
              "/tmp/test*.json"
            ],
            "column": [
              {
                "index": "$.id",
                "type": "long"
              },
              {
                "index": "$.name",
                "type": "string"
              },
              {
                "index": "$.age",
                "type": "long"
              },
              {
                "index": "$.score.math",
                "type": "double"
              },
              {
                "index": "$.score.english",
                "type": "double"
              },
              {
                "index": "$.pubdate",
                "type": "date"
              },
              {
                "type": "string",
                "value": "constant string"
              }
            ]
          }
        }
      }
    ]
  }
}

其中 /tmp/test*.json 为同一个 json 文件的多个复制,内容如下:

{
  "name": "zhangshan",
  "id": 19890604,
  "age": 12,
  "score": {
    "math": 92.5,
    "english": 97.5,
    "chinese": 95
  },
  "pubdate": "2020-09-05"
}
3.2 参数说明#
配置项 是否必须 默认值 描述
path 本地文件系统的路径信息,注意这里可以支持填写多个路径,详细描述见下文
column 默认String类型 读取字段列表,type指定源数据的类型,详见下文
fieldDelimiter , 描述:读取的字段分隔符
compress 文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、gzip、bzip2
encoding utf-8 读取文件的编码配置
path#

本地文件系统的路径信息,注意这里可以支持填写多个路径

  • 当指定单个本地文件,JsonFileReader暂时只能使用单线程进行数据抽取。

  • 当指定多个本地文件,JsonFileReader支持使用多线程进行数据抽取。线程并发数通过通道数指定。

  • 当指定通配符,JsonFileReader尝试遍历出多个文件信息。例如: 指定/* 代表读取/目录下所有的文件,指定/bazhen/* 代表读取bazhen目录下游所有的文件。 JsonFileReader目前只支持 * 作为文件通配符。

特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。

column#

读取字段列表,type指定源数据的类型,index指定当前列来自于json的指定,语法为 Jayway JsonPath 的语法,value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。 用户必须指定Column字段信息

对于用户指定Column信息,type必须填写,index/value必须选择其一

3.3 类型转换#

本地文件本身不提供数据类型,该类型是Addax JsonFileReade定义:

Addax 内部类型 本地文件 数据类型
Long Long
Double Double
String String
Boolean Boolean
Date Date

KuduReader#

KuduReader 插件利用 Kudu 的java客户端KuduClient进行Kudu的读操作。

KuduReader通过Datax框架从Kudu并行的读取数据,通过主控的JOB程序按照指定的规则对Kudu中的数据进行分片,并行读取,然后将Kudu支持的类型通过逐一判断转换成Datax支持的类型。

示例#

我们通过 Trinokudu connector 连接 kudu 服务,然后进行表创建以及数据插入

建表语句以及数据插入语句#
CREATE TABLE kudu.default.users (
  user_id int WITH (primary_key = true),
  user_name varchar,
  age int,
  salary double,
  longtitue decimal(18,6),
  latitude decimal(18,6),
  p decimal(21,20),
  mtime timestamp
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 2
);

insert into kudu.default.users 
values 
(1, cast('wgzhao' as varchar), 18, cast(18888.88 as double), 
 cast(123.282424 as decimal(18,6)), cast(23.123456 as decimal(18,6)),
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 14:40:41'),
(2, cast('anglina' as varchar), 16, cast(23456.12 as double), 
 cast(33.192123 as decimal(18,6)), cast(56.654321 as decimal(18,6)), 
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 03:40:41');
-- ONLY insert primary key value
 insert into kudu.default.users(user_id) values  (3);
配置#

以下是读取kudu表并输出到终端的配置

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "kudureader",
          "parameter": {
            "masterAddress": "localhost:7051,localhost:7151,localhost:7251",
            "table": "users",
            "splitPk": "user_id",
            "lowerBound": 1,
            "upperBound": 100,
            "readTimeout": 5,
            "scanTimeout": 10
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}

把上述配置文件保存为 job/kudu2stream.json

执行#

执行下面的命令进行采集

bin/addax.py job/kudu2stream.json

输出结果类似如下(删除了不必需要的内容)

2021-01-10 15:46:59.303 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl

2021-01-10 15:46:59.329 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"masterAddress":"localhost:7051,localhost:7151,localhost:7251",
					"upperBound":100,
					"readTimeout":5,
					"lowerBound":1,
					"splitPk":"user_id",
					"table":"users",
					"scanTimeout":10
				},
				"name":"kudureader"
			},
			"writer":{
				"parameter":{
					"print":true
				},
				"name":"streamwriter"
			}
		}
	],
	"setting":{
		"errorLimit":{
			"record":0,
			"percentage":0.02
		},
		"speed":{
			"channel":3
		}
	}
}

3	null	null	null	null	null	null	null
1	wgzhao	18	18888.88	123.282424	23.123456	1.12345678912345678912	2021-01-10 22:40:41
2	anglina	16	23456.12	33.192123	56.654321	1.12345678912345678912	2021-01-10 11:40:41

任务启动时刻                    : 2021-01-10 15:46:59
任务结束时刻                    : 2021-01-10 15:47:02
任务总计耗时                    :                  3s
任务平均流量                    :               52B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

参数说明#

配置项 是否必须 类型 默认值 描述
masterAddress 必须 string Kudu Master集群RPC地址,多个地址用逗号(,)分隔
table 必须 string kudu 表名
splitPk string 并行读取数据分片字段
lowerBound string 并行读取数据分片范围下界
upperBound string 并行读取数据分片范围上界
readTimeout int 10 读取数据超时(秒)
scanTimeout int 20 数据扫描请求超时(秒)

类型转换#

Addax 内部类型 Kudu 数据类型
Long byte, short, int, long
Double float, double, decimal
String string
Date timestamp
Boolean boolean
Bytes binary

MongoDBReader 插件文档#

1 快速介绍#

MongoDBReader 插件利用 MongoDB 的java客户端MongoClient进行MongoDB的读操作。最新版本的Mongo已经将DB锁的粒度从DB级别降低到document级别,配合上MongoDB强大的索引功能,基本可以达到高性能的读取MongoDB的需求。

2 实现原理#

MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的JOB程序按照指定的规则对MongoDB中的数据进行分片,并行读取,然后将MongoDB支持的类型通过逐一判断转换成Datax支持的类型。

3 功能说明#

3.1 配置样例#

该示例从MongoDB中读一张表并打印到终端

{
  "job": {
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mongodbreader",
          "parameter": {
            "address": [
              "127.0.0.1:32768"
            ],
            "userName": "",
            "userPassword": "",
            "dbName": "tag_per_data",
            "collectionName": "tag_data",
            "column": [
              {
                "name": "unique_id",
                "type": "string"
              },
              {
                "name": "sid",
                "type": "string"
              },
              {
                "name": "user_id",
                "type": "string"
              },
              {
                "name": "auction_id",
                "type": "string"
              },
              {
                "name": "content_type",
                "type": "string"
              },
              {
                "name": "pool_type",
                "type": "string"
              },
              {
                "name": "frontcat_id",
                "type": "Array",
                "spliter": ""
              },
              {
                "name": "categoryid",
                "type": "Array",
                "spliter": ""
              },
              {
                "name": "gmt_create",
                "type": "string"
              },
              {
                "name": "taglist",
                "type": "Array",
                "spliter": " "
              },
              {
                "name": "property",
                "type": "string"
              },
              {
                "name": "scorea",
                "type": "int"
              },
              {
                "name": "scoreb",
                "type": "int"
              },
              {
                "name": "scorec",
                "type": "int"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": "true"
          }
        }
      }
    ]
  }
}
4 参数说明#
配置项 是否必须 默认值 描述
address MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出
userName MongoDB的用户名
userPassword MongoDB的密码
collectionName MonogoDB的集合名
column MongoDB的文档列名
name Column的名字
type Column的类型
splitter 指定 MongoDB数组转为字符串的分隔符
5 类型转换#
Addax 内部类型 MongoDB 数据类型
Long int, Long
Double double
String string, array
Date date
Boolean boolean
Bytes bytes

MysqlReader#

MysqlReader插件实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中SELECT出来。

不同于其他关系型数据库,MysqlReader不支持FetchSize

MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用Addax自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。

示例#

我们在 MySQL 的 test 库上创建如下表:

CREATE TABLE addax_reader
(
    c_bigint     bigint,
    c_varchar    varchar(100),
    c_timestamp  timestamp,
    c_text       text,
    c_decimal    decimal(8, 3),
    c_mediumtext mediumtext,
    c_longtext   longtext,
    c_int        int,
    c_time       time,
    c_datetime   datetime,
    c_enum       enum('one', 'two', 'three'),
    c_float      float,
    c_smallint   smallint,
    c_bit        bit,
    c_double     double,
    c_blob       blob,
    c_char       char(5),
    c_varbinary  varbinary(100),
    c_tinyint    tinyint,
    c_json       json,
    c_set SET ('a', 'b', 'c', 'd'),
    c_binary     binary,
    c_longblob   longblob,
    c_mediumblob mediumblob
);

然后插入下面一条记录

```sql
INSERT INTO addax_reader
VALUES (2E18,
        'a varchar data',
        '2021-12-12 12:12:12',
        'a long text',
        12345.122,
        'a medium text',
        'a long text',
        2 ^ 32 - 1,
        '12:13:14',
        '2021-12-12 12:13:14',
        'one',
        17.191,
        126,
        0,
        1114.1114,
        'blob',
        'a123b',
        'a var binary content',
        126,
        '{"k1":"val1","k2":"val2"}',
        'b',
        binary(1),
        x '89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200',
        x '89504E470D0A1A0A0000000D');

下面的配置是读取该表到终端的作业:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "*"
            ],
            "connection": [
              {
                "table": [
                  "addax_reader"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://127.0.0.1:3306/test"
                ],
                "driver": "com.mysql.jdbc.Driver"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/mysql2stream.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/mysql2stream.json

参数说明#

配置项 是否必须 类型 默认值 描述
jdbcUrl list 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
driver string 自定义驱动类名,解决兼容性问题,详见下面描述
username string 数据源的用户名
password string 数据源指定用户名的密码
table list 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column list 所配置的表中需要同步的列名集合,详细描述rdbmreader
splitPk string 使用splitPk代表的字段进行数据分片,详细描述见rdbmreader
autoPk bool false 是否自动猜测分片主键,3.2.6 版本引入
where string 针对表的筛选条件
querySql list 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 tablecolumn这些配置项
driver#

当前 Addax 采用的 MySQL JDBC 驱动为 8.0 以上版本,驱动类名使用的 com.mysql.cj.jdbc.Driver,而不是 com.mysql.jdbc.Driver。 如果你需要采集的 MySQL 服务低于 5.6,需要使用到 Connector/J 5.1 驱动,则可以采取下面的步骤:

替换插件内置的驱动

rm -f plugin/reader/mysqlreader/lib/mysql-connector-java-*.jar

拷贝老的驱动到插件目录

cp mysql-connector-java-5.1.48.jar plugin/reader/mysqlreader/lib/

指定驱动类名称

在你的 json 文件类,配置 "driver": "com.mysql.jdbc.Driver"

类型转换#

目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出MysqlReader针对Mysql类型转换列表:

Addax 内部类型 MySQL 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

请注意:

  • 除上述罗列字段类型外,其他类型均不支持

  • tinyint(1) Addax视作为整形

  • year Addax视作为字符串类型

  • bit Addax属于未定义行为

3.4 数据库编码问题#

Mysql本身的编码设置非常灵活,包括指定编码到库、表、字段级别,甚至可以均不同编码。优先级从高到低为字段、表、库、实例。我们不推荐数据库用户设置如此混乱的编码,最好在库级别就统一到UTF-8。

MysqlReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此MysqlReader不需用户指定编码,可以自动获取编码并转码。

对于Mysql底层写入编码和其设定的编码不一致的混乱情况,MysqlReader对此无法识别,对此也无法提供解决方案,对于这类情况,导出有可能为乱码

OracleReader 插件文档#

1 快速介绍#

OracleReader插件实现了从Oracle读取数据。在底层实现上,OracleReader通过JDBC连接远程Oracle数据库,并执行相应的sql语句将数据从Oracle库中SELECT出来。

2 实现原理#

简而言之,OracleReader通过JDBC连接器连接到远程的Oracle数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程Oracle数据库, 并将该SQL执行返回结果使用Addax自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

对于用户配置Table、Column、Where的信息,OracleReader将其拼接为SQL语句发送到Oracle数据库;对于用户配置querySql信息,Oracle直接将其发送到Oracle数据库。

3 功能说明#

3.1 配置样例#

配置一个从Oracle数据库同步抽取数据到本地的作业:

{
  "job": {
    "setting": {
      "speed": {
        "byte": 1048576,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "oraclereader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "id",
              "name"
            ],
            "splitPk": "db_id",
            "connection": [
              {
                "table": [
                  "table"
                ],
                "jdbcUrl": [
                  "jdbc:oracle:thin:@<HOST_NAME>:PORT:<DATABASE_NAME>"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
jdbcUrl 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
username 数据源的用户名
password 数据源指定用户名的密码
table 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column 所配置的表中需要同步的列名集合,详细描述见rdbmsreader
splitPk 使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能
autoPk false 是否自动猜测分片主键,3.2.6 版本引入
where 针对表的筛选条件
querySql 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 tablecolumn这些配置项
fetchSize 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM
session 针对本地连接,修改会话配置,详见下文
session#

控制写入数据的时间格式,时区等的配置,如果表中有时间字段,配置该值以明确告知写入 oracle 的时间格式。通常配置的参数为:NLS_DATE_FORMAT,NLS_TIME_FORMAT。其配置的值为 json 格式,例如:

"session": [
"alter session set NLS_DATE_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set NLS_TIMESTAMP_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set NLS_TIMESTAMP_TZ_FORMAT='yyyy-mm-dd hh24:mi:ss'",
"alter session set TIME_ZONE='Asia/Chongqing'"
]

注意 &quot;" 的转义字符串

3.3 类型转换#

目前OracleReader支持大部分Oracle类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出OracleReader针对Oracle类型转换列表:

Addax 内部类型 Oracle 数据类型
Long NUMBER, INTEGER, INT, SMALLINT
Double NUMERIC, DECIMAL, FLOAT, DOUBLE PRECISION, REAL
String LONG ,CHAR, NCHAR, VARCHAR, VARCHAR2, NVARCHAR2, CLOB, NCLOB, CHARACTER, CHARACTER VARYING, CHAR VARYING, NATIONAL CHARACTER, NATIONAL CHAR, NATIONAL CHARACTER VARYING, NATIONAL CHAR VARYING, NCHAR VARYING
Date TIMESTAMP, DATE
Boolean bit, bool
Bytes BLOB, BFILE, RAW, LONG RAW

请注意: 除上述罗列字段类型外,其他类型均不支持

数据库编码问题#

OracleReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此OracleReader不需用户指定编码,可以自动获取编码并转码。

对于Oracle底层写入编码和其设定的编码不一致的混乱情况,OracleReader对此无法识别,对此也无法提供解决方案,对于这类情况,导出有可能为乱码

PostgresqlReader#

PostgresqlReader插件实现了从PostgreSQL读取数据。在底层实现上,PostgresqlReader通过JDBC连接远程PostgreSQL数据库,并执行相应的sql语句将数据从PostgreSQL库中SELECT出来。

示例#

假定建表语句以及输入插入语句如下:

create table if not exists addax_tbl 
(
    c_bigint bigint,
    c_bit bit(3),
    c_bool boolean,
    c_byte bytea,
    c_char char(10),
    c_varchar varchar(20),
    c_date  date,
    c_double float8,
    c_int integer,
    c_json json,
    c_number decimal(8,3),
    c_real  real,
    c_small smallint,
    c_text  text,
    c_ts timestamp,
    c_uuid uuid,
    c_xml xml,
    c_money money,
    c_inet inet,
    c_cidr cidr,
    c_macaddr macaddr
);
insert into addax_tbl
values (999988887777,
        B '101',
        TRUE,
        '\xDEADBEEF',
        'hello',
        'hello, world',
        '2021-01-04',
        999888.9972,
        9876542,
        '{"bar": "baz", "balance": 7.77, "active": false}'::json,
        12345.123,
        123.123,
        126,
        'this is a long text ',
        '2020-01-04 12:13:14',
        'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11'::uuid,
        '<foo>bar</foo>'::xml,
        '52093.89'::money,
        '192.168.1.1'::inet,
        '192.168.1/24'::cidr,
        '08002b:010203'::macaddr);

配置一个从PostgreSQL数据库同步抽取数据到本地的作业:

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "postgresqlreader",
          "parameter": {
            "username": "pgtest",
            "password": "pgtest",
            "column": [
              "*"
            ],
            "connection": [
              {
                "table": [
                  "addax_tbl"
                ],
                "jdbcUrl": [
                  "jdbc:postgresql://127.0.0.1:5432/pgtest"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/postgres2stream.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/postgres2stream.json

其输出信息如下(删除了非关键信息)

2021-01-07 10:15:12.295 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"password":"*****",
					"column":[
						"*"
					],
					"connection":[
						{
							"jdbcUrl":[
								"jdbc:postgresql://localhost:5432/pgtest"
							],
							"table":[
								"addax_tbl"
							]
						}
					],
					"username":"pgtest"
				},
				"name":"postgresqlreader"
			},
			"writer":{
				"parameter":{
					"print":true
				},
				"name":"streamwriter"
			}
		}
	],
	"setting":{
		"speed":{
			"byte":-1,
			"channel":1
		}
	}
}

999988887777	101	true   	ޭ��	hello     	hello, world	2021-01-04	999888.99719999998	9876542	{"bar": "baz", "balance": 7.77, "active": false}	12345.123	123.123	126	this is a long text 	2020-01-04 12:13:14	a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11	<foo>bar</foo>	52093.89	192.168.1.1	192.168.1.0/24	08:00:2b:01:02:03

任务启动时刻                    : 2021-01-07 10:15:12
任务结束时刻                    : 2021-01-07 10:15:15
任务总计耗时                    :                  3s
任务平均流量                    :               90B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   1
读写失败总数                    :                   0

参数说明#

配置项 是否必须 默认值 描述
jdbcUrl 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
username 数据源的用户名
password 数据源指定用户名的密码
table 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column 所配置的表中需要同步的列名集合,详细描述见rdbmsreader
splitPk 使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能
autoPk false 是否自动猜测分片主键,3.2.6 版本引入
where 针对表的筛选条件
querySql 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 tablecolumn这些配置项
fetchSize 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM

类型转换#

目前PostgresqlReader支持大部分PostgreSQL类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出PostgresqlReader针对PostgreSQL类型转换列表:

Addax 内部类型 PostgreSQL 数据类型
Long bigint, bigserial, integer, smallint, serial
Double double precision, money, numeric, real
String varchar, char, text, bit(>1), inet, cidr, macaddr, array,uuid,json,xml
Date date, time, timestamp
Boolean bool,bit(1)
Bytes bytea

已知限制#

除上述罗列字段类型外,其他类型均不支持;

RDBMS Reader#

RDBMSReader 插件支持从传统 RDBMS 读取数据。这是一个通用关系数据库读取插件,可以通过注册数据库驱动等方式支持更多关系数据库读取。

同时 RDBMS Reader 又是其他关系型数据库读取插件的的基础类。以下读取插件均依赖该插件

  • Oracle Reader

  • MySQL Reader

  • PostgreSQL Reader

  • ClickHouse Reader

  • SQLServer Reader

配置说明#

以下配置展示了如何从 Presto 数据库读取数据到终端

{
  "job": {
    "setting": {
      "speed": {
        "byte": 1048576,
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "rdbmsreader",
          "parameter": {
            "username": "hive",
            "password": "",
            "column": [
              "*"
            ],
            "connection": [
              {
                "table": [
                  "default.table"
                ],
                "jdbcUrl": [
                  "jdbc:presto://127.0.0.1:8080/hive"
                ],
                "driver": ""
              }
            ],
            "fetchSize": 1024,
            "where": "1 = 1"
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}

参数说明#

parameter 配置项支持以下配置

配置项 是否必须 数据类型 默认值 描述
jdbcUrl array 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
driver string 自定义驱动类名,解决兼容性问题,详见下面描述
username string 数据源的用户名
password string 数据源指定用户名的密码
table array 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column array 所配置的表中需要同步的列名集合,详细描述见后
splitPk string 使用splitPk代表的字段进行数据分片,Addax因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能,注意事项见后
autoPk bool false 是否自动猜测分片主键,3.2.6 版本引入,详见后面描述
where string 针对表的筛选条件
querySql string 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 tablecolumn这些配置项
fetchSize int 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM
jdbcUrl#

jdbcUrl 配置除了配置必要的信息外,我们还可以在增加每种特定驱动的特定配置属性,这里特别提到我们可以利用配置属性对代理的支持从而实现通过代理访问数据库的功能。 比如对于 PrestoSQL 数据库的 JDBC 驱动而言,支持 socksProxy 参数,于是上述配置的 jdbcUrl 可以修改为

jdbc:presto://127.0.0.1:8080/hive?socksProxy=192.168.1.101:1081

大部分关系型数据库的 JDBC 驱动支持 socksProxyHost,socksProxyPort 参数来支持代理访问。也有一些特别的情况。

以下是各类数据库 JDBC 驱动所支持的代理类型以及配置方式

数据库 代理类型 代理配置 例子
MySQL socks socksProxyHost,socksProxyPort socksProxyHost=192.168.1.101&socksProxyPort=1081
Presto socks socksProxy socksProxy=192.168.1.101:1081
Presto http httpProxy httpProxy=192.168.1.101:3128
driver#

大部分情况下,一个数据库的JDBC驱动是固定的,但有些因为版本的不同,所建议的驱动类名不同,比如 MySQL。 新的 MySQL JDBC 驱动类型推荐使用 com.mysql.cj.jdbc.Driver 而不是以前的 com.mysql.jdbc.Drver。如果想要使用就的驱动名称,则可以配置 driver 配置项。

column#

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用 * 代表默认使用所有列配置,例如 ["*"]

支持列裁剪,即列可以挑选部分列进行导出。

支持列换序,即列可以不按照表schema信息进行导出。

支持常量配置,用户需要按照JSON格式:

["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]

  • id 为普通列名

  • `table` 为包含保留在的列名,

  • 1 为整形数字常量,

  • 'bazhen.csy'为字符串常量

  • null 为空指针,注意,这里的 null 必须以字符串形式出现,即用双引号引用

  • to_char(a + 1)为表达式,

  • 2.3 为浮点数,

  • true 为布尔值,同样的,这里的布尔值也必须用双引号引用

Column必须显示填写,不允许为空!

splitPk#

RdbmsReader 进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,Addax 因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。

推荐 splitPk 用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

目前 splitPk 仅支持整形、字符串型数据(ASCII类型) 切分,不支持浮点、日期等其他类型。 如果用户指定其他非支持类型,RDBMSReader 将报错!

splitPk如果不填写,将视作用户不对单表进行切分,RDBMSReader 使用单通道同步全量数据。

autoPk#

3.2.6 版本开始,支持自动获取表主键或唯一索引,如果设置为 true ,RdbmsReader 将尝试通过查询数据库的元数据信息获取指定表的主键字段或唯一索引字段,如果获取可用于分隔的 字段不止一个,则默认取第一个。后续将会考虑优先取整数类型。

该特性目前支持的数据库有:

  • ClickHouse

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

3.3 类型转换#

目前 RDBMSReader 支持大部分通用得关系数据库类型如数字、字符等,但也存在部分个别类型没有支持的情况,请注意检查你的类型,根据具体的数据库做选择。

Addax 内部类型 RDBMS 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year,xml
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

4. 当前支持的数据库#

RedisReader 插件文档#

1 快速介绍#

RedisReader 提供了读取Redis RDB 的能力。在底层实现上获取本地RDB文件/Redis Server数据,并转换为Addax传输协议传递给Writer。

2 功能与限制#

  1. 支持读取本地RDB/http RDB/redis server RDB的文件并转换成redis dump格式。

  2. 支持过滤 DB/key名称过滤

我们暂时不能做到:

  1. 单个RDB支持多线程并发读取。

  2. Redis Server 未开启sync命令。

  3. 读取rdb文件转换成json数据。

3 功能说明#

3.1 配置样例#
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "redisreader",
          "parameter": {
            "connection": [
              {
                "uri": "file:///root/dump.rdb",
                "uri": "http://localhost/dump.rdb",
                "uri": "tcp://127.0.0.1:7001",
                "uri": "tcp://127.0.0.1:7002",
                "uri": "tcp://127.0.0.1:7003",
                "auth": "password"
              }
            ],
            "include": [
              "^user"
            ],
            "exclude": [
              "^password"
            ],
            "db": [
              0,
              1
            ]
          }
        },
        "writer": {
          "name": "rediswriter",
          "parameter": {
            "connection": [
              {
                "uri": "tcp://127.0.0.1:6379",
                "auth": "123456"
              }
            ],
            "timeout": 60000
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
      }
    }
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
uri redis链接,支持多个本地rdb文件/网络rdb文件,如果是集群,填写所有master节点地址
db 需要读取的db索引,若不填写,则读取所有db
include 要包含的 key, 支持正则表达式
exclude 要排除的 key,支持正则表达式

5 约束限制#

  1. 不支持直接读取任何不支持sync命令的redis server,如果需要请备份的rdb文件进行读取。

  2. 如果是原生redis cluster集群,请填写所有master节点的tcp地址,redisreader插件会自动dump 所有节点的rdb文件。

  3. 仅解析 String 数据类型,其他复合类型(Sets, List 等会忽略)

SqlServerReader 插件文档#

1 快速介绍#

SqlServerReader插件实现了从SqlServer读取数据。在底层实现上,SqlServerReader通过JDBC连接远程SqlServer数据库,并执行相应的sql语句将数据从SqlServer库中SELECT出来。

2 实现原理#

简而言之,SqlServerReader通过JDBC连接器连接到远程的SqlServer数据库,并根据用户配置的信息生成查询SELECT SQL语句并发送到远程SqlServer数据库,并将该SQL执行返回结果使用Addax自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

对于用户配置Table、Column、Where的信息,SqlServerReader将其拼接为SQL语句发送到SqlServer数据库;对于用户配置querySql信息,SqlServer直接将其发送到SqlServer数据库。

3 功能说明#

3.1 配置样例#

配置一个从SqlServer数据库同步抽取数据到本地的作业:

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "sqlserverreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "*"
            ],
            "splitPk": "db_id",
            "connection": [
              {
                "table": [
                  "table"
                ],
                "jdbcUrl": [
                  "jdbc:sqlserver://localhost:3433;DatabaseName=dbname"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true,
            "encoding": "UTF-8"
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
jdbcUrl 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
username 数据源的用户名
password 数据源指定用户名的密码
table 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column 所配置的表中需要同步的列名集合,详细描述见rdbmsreader
splitPk 使用splitPk代表的字段进行数据分片,详细描述见rdbms
autoPk false 是否自动猜测分片主键,3.2.6 版本引入
where 针对表的筛选条件
querySql 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 tablecolumn这些配置项
fetchSize 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM
3.3 类型转换#

目前SqlServerReader支持大部分SqlServer类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出SqlServerReader针对SqlServer类型转换列表:

Addax 内部类型 SqlServer 数据类型
Long bigint, int, smallint, tinyint
Double float, decimal, real, numeric
String char,nchar,ntext,nvarchar,text,varchar,nvarchar(MAX),varchar(MAX)
Date date, datetime, time
Boolean bit
Bytes binary,varbinary,varbinary(MAX),timestamp, image

请注意:

  • 除上述罗列字段类型外,其他类型均不支持

  • timestamp类型作为二进制类型

StreamReader#

StreamReader 是一个从内存读取数据的插件, 他主要用来快速生成期望的数据并对写入插件进行测试

一个完整的 StreamReader 配置文件如下:

{
  "reader": {
    "name": "streamreader",
    "parameter": {
      "column": [
        {
          "value": "unique_id",
          "type": "string"
        },
        {
          "value": "1989-06-04 08:12:13",
          "type": "date",
          "dateFormat": "yyyy-MM-dd HH:mm:ss"
        },
        {
          "value": 1984,
          "type": "long"
        },
        {
          "value": 1989.64,
          "type": "double"
        },
        {
          "value": true,
          "type": "bool"
        },
        {
          "value": "a long text",
          "type": "bytes"
        }
      ],
      "sliceRecordCount": 10
    }
  }
}

上述配置文件将会生成 10条记录(假定channel为1),每条记录的内容如下:

unique_id,'1989-06-04 08:12:13',1984,1989.64,true,'a long text'

目前 StreamReader 支持的输出数据类型全部列在上面,分别是:

  • string 字符类型

  • date 日期类型

  • long 所有整型类型

  • double 所有浮点数

  • bool 布尔类型

  • bytes 字节类型

其中 date 类型还支持 dateFormat 配置,用来指定输入的日期的格式,默认为 yyyy-MM-dd HH:mm:ss。比如你的输入可以这样:

{
  "value": "1989/06/04 12:13:14",
  "type": "date",
  "dateFormat": "yyyy/MM/dd HH:mm:ss"
}

注意,日期类型不管输入是何种格式,内部都转为 yyyy-MM-dd HH:mm:ss 格式。

StreamReader 还支持随机输入功能,比如我们要随机得到0-10之间的任意一个整数,我们可以这样配置列:

{
  "random": "0,10",
  "type": "long"
}

这里使用 random 这个关键字来表示其值为随机值,其值的范围为左右闭区间。

其他类型的随机类型配置如下:

  • long: random 0, 10 0到10之间的随机数字

  • string: random 0, 10 0到10长度之间的随机字符串

  • bool: random 0, 10 false 和 true出现的比率

  • double: random 0, 10 0到10之间的随机浮点数

  • date: random '2014-07-07 00:00:00', '2016-07-07 00:00:00' 开始时间->结束时间之间的随机时间,日期格式默认(不支持逗号)yyyy-MM-dd HH:mm:ss

  • BYTES: random 0, 10 0到10长度之间的随机字符串获取其UTF-8编码的二进制串

StreamReader 还支持递增函数,比如我们要得到一个从1开始,每次加5的等差数列,可以这样配置:

{
  "incr": "1,5",
  "type": "long"
}

如果需要获得一个递减的数列,则把第二个参数的步长(上例中的5)改为负数即可。步长默认值为1。

当前递增函数仅支持整数类型(即 long),后续考虑支持日期类型

配置项 sliceRecordCount 用来指定要生成的数据条数,如果指定的 channel,则实际生成的记录数为 sliceRecordCount * channel

TDengineReader#

TDengineReader 插件实现了从涛思公司的 TDengine 读取数据。在底层实现上,TDengineReader 通过JDBC JNI 驱动连接远程 TDengine 数据库, 并执行相应的sql语句将数据从 TDengine 库中批量获。

不同于其他关系型数据库,TDengine 不支持FetchSize

前置条件#

考虑到性能问题,该插件使用了 TDengine 的 JDBC-JNI 驱动, 该驱动直接调用客户端 API(libtaos.so 或 taos.dll)将写入和查询请求发送到taosd 实例。因此在使用之前需要配置好动态库链接文件。

首先将 plugin/reader/tdenginereader/libs/libtaos.so.2.0.16.0 拷贝到 /usr/lib64 目录,然后执行下面的命令创建软链接

ln -sf /usr/lib64/libtaos.so.2.0.16.0 /usr/lib64/libtaos.so.1
ln -sf /usr/lib64/libtaos.so.1 /usr/lib64/libtaos.so

示例#

TDengine 数据自带了一个演示数据库 taosdemo,我们从演示数据库读取部分数据并打印到终端

以下是配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "tdenginereader",
          "parameter": {
            "username": "root",
            "password": "taosdata",
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:TAOS://127.0.0.1:6030/test"
                ],
                "querySql": [
                  "select * from test.meters where ts <'2017-07-14 10:40:02' and  loc='beijing' limit 10"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/tdengine2stream.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/tdengine2stream.json

命令输出类似如下:

2021-02-20 15:32:23.161 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-20 15:32:23.229 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"password":"*****",
					"connection":[
						{
							"querySql":[
								"select * from test.meters where ts <'2017-07-14 10:40:02' and  loc='beijing' limit 100"
							],
							"jdbcUrl":[
								"jdbc:TAOS://127.0.0.1:6030/test"
							]
						}
					],
					"username":"root"
				},
				"name":"tdenginereader"
			},
			"writer":{
				"parameter":{
					"print":true
				},
				"name":"streamwriter"
			}
		}
	],
	"setting":{
		"errorLimit":{
			"record":0,
			"percentage":0.02
		},
		"speed":{
			"channel":3
		}
	}
}

2021-02-20 15:32:23.277 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-20 15:32:23.278 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-20 15:32:23.281 [main] INFO  JobContainer - Set jobId = 0
java.library.path:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
....
2021-02-20 15:32:23.687 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select * from test.meters where ts <'2017-07-14 10:40:02' and  loc='beijing' limit 100
] jdbcUrl:[jdbc:TAOS://127.0.0.1:6030/test].
2021-02-20 15:32:23.692 [0-0-0-reader] WARN  DBUtil - current database does not supoort TYPE_FORWARD_ONLY/CONCUR_READ_ONLY
2021-02-20 15:32:23.740 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select * from test.meters where ts <'2017-07-14 10:40:02' and  loc='beijing' limit 100
] jdbcUrl:[jdbc:TAOS://127.0.0.1:6030/test].

1500000001000	5	5	0	1	beijing
1500000001000	0	6	2	1	beijing
1500000001000	7	0	0	1	beijing
1500000001000	8	9	6	1	beijing
1500000001000	9	9	1	1	beijing
1500000001000	8	2	0	1	beijing
1500000001000	4	5	5	3	beijing
1500000001000	3	3	3	3	beijing
1500000001000	5	4	8	3	beijing
1500000001000	9	4	6	3	beijing

2021-02-20 15:32:26.689 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-20 15:32:23
任务结束时刻                    : 2021-02-20 15:32:26
任务总计耗时                    :                  3s
任务平均流量                    :              800B/s
记录写入速度                    :             33rec/s
读出记录总数                    :                 100
读写失败总数                    :                   0

参数说明#

配置项 是否必须 类型 默认值 描述
jdbcUrl list 对端数据库的JDBC连接信息,注意这里的 TAOS 必须大写
username string 数据源的用户名
password string 数据源指定用户名的密码
table list 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column list 所配置的表中需要同步的列名集合,详细描述rdbmreader
where string 针对表的筛选条件
querySql list 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 tablecolumn这些配置项

类型转换#

目前 TDenginereader 支持 TDengine 所有类型,具体如下

Addax 内部类型 TDengine 数据类型
Long SMALLINT, TINYINT, INT, BIGINT, TIMESTAMP
Double FLOAT, DOUBLE
String BINARY, NCHAR
Boolean BOOL

当前支持版本#

TDengine 2.0.16

注意事项#

  • TDengine JDBC-JNI 驱动和动态库版本要求一一匹配,因此如果你的数据版本并不是 2.0.16,则需要同时替换动态库和插件目录中的JDBC驱动

Addax TxtFileReader 说明#

1 快速介绍#

TxtFileReader提供了读取本地文件系统数据存储的能力。在底层实现上,TxtFileReader获取本地文件数据,并转换为Addax传输协议传递给Writer。

本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。

2 功能与限制#

TxtFileReader实现了从本地文件读取数据并转为Addax协议的功能,本地文件本身是无结构化数据存储,对于Addax而言,TxtFileReader实现上类比OSSReader,有诸多相似之处。目前TxtFileReader支持功能如下:

  1. 支持且仅支持读取TXT的文件,且要求TXT中shema为一张二维表。

  2. 支持类CSV格式文件,自定义分隔符。

  3. 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量

  4. 支持递归读取、支持文件名过滤。

  5. 支持文本压缩,且自动猜测压缩格式

  6. 多个File可以支持并发读取。

我们暂时不能做到:

  1. 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。

  2. 单个File在压缩情况下,从技术上无法支持多线程并发读取。

3 功能说明#

3.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "txtfilereader",
          "parameter": {
            "path": [
              "/tmp/data"
            ],
            "encoding": "UTF-8",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": 1,
                "type": "boolean"
              },
              {
                "index": 2,
                "type": "double"
              },
              {
                "index": 3,
                "type": "string"
              },
              {
                "index": 4,
                "type": "date",
                "format": "yyyy.MM.dd"
              }
            ],
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "txtfilewriter",
          "parameter": {
            "path": "/tmp/result",
            "fileName": "txt_",
            "writeMode": "truncate",
            "format": "yyyy-MM-dd"
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
path 本地文件系统的路径信息,注意这里可以支持填写多个路径,详细描述见下文
column 默认String类型 读取字段列表,type指定源数据的类型,详见下文
fieldDelimiter , 描述:读取的字段分隔符
encoding utf-8 读取文件的编码配置
skipHeader false 类CSV格式文件可能存在表头为标题情况,需要跳过。默认不跳过
csvReaderConfig 读取CSV类型文件参数配置,Map类型。不配置则使用默认值,详见下文
path#

本地文件系统的路径信息,注意这里可以支持填写多个路径。

  • 当指定单个本地文件,TxtFileReader暂时只能使用单线程进行数据抽取。二期考虑在非压缩文件情况下针对单个File可以进行多线程并发读取

  • 当指定多个本地文件,TxtFileReader支持使用多线程进行数据抽取。线程并发数通过通道数指定

  • 当指定通配符,TxtFileReader尝试遍历出多个文件信息。例如: 指定 /*代表读取 / 目录下所有的文件,指定 /bazhen/* 代表读取bazhen目录下游所有的文件。目前只支持 * 作为文件通配符。

特别需要注意的是,Addax会将一个作业下同步的所有Text File视作同一张数据表。用户必须自己保证所有的File能够适配同一套schema信息。读取文件用户必须保证为类CSV格式,并且提供给Addax权限可读。

特别需要注意的是,如果Path指定的路径下没有符合匹配的文件抽取,Addax将报错。

从 3.2.3 版本起, path 下允许混合不同压缩格式的文件,插件会尝试自动猜测压缩格式并自动解压,目前支持的压缩格式有:

  • zip

  • bzip2

  • gzip

  • LZ4

  • PACK200

  • XZ

  • Compress

column#

读取字段列表,type指定源数据的类型,index指定当前列来自于文本第几列(以0开始),value指定当前类型为常量,不从源头文件读取数据,而是根据value值自动生成对应的列。

默认情况下,用户可以全部按照String类型读取数据,配置如下:

{
  "column": [
    "*"
  ]
}

用户可以指定Column字段信息,配置如下:

[
  {
    "type": "long",
    "index": 0
  },
  {
    "type": "string",
    "value": "alibaba"
  }
]

对于用户指定Column信息,type必须填写,index/value必须选择其一。

csvReaderConfig#

读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值。

常见配置:

{
  "csvReaderConfig": {
    "safetySwitch": false,
    "skipEmptyRecords": false,
    "useTextQualifier": false
  }
}

所有配置项及默认值,配置时 csvReaderConfig 的map中请严格按照以下字段名字进行配置

boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
3.3 类型转换#

本地文件本身不提供数据类型,该类型是Addax TxtFileReader定义:

Addax 内部类型 本地文件 数据类型
Long Long
Double Double
String String
Boolean Boolean
Date Date

其中:

  • Long 是指本地文件文本中使用整形的字符串表示形式,例如"19901219"。

  • Double 是指本地文件文本中使用Double的字符串表示形式,例如"3.1415"。

  • Boolean 是指本地文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。

  • Date 是指本地文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。

写入插件#

本章描述 Addax 目前支持的数据写入插件

CassandraWriter 插件文档#

1 快速介绍#

CassandraWriter插件实现了向Cassandra写入数据。在底层实现上,CassandraWriter通过datastax的java driver连接Cassandra实例,并执行相应的cql语句将数据写入cassandra中。

2 实现原理#

简而言之,CassandraWriter通过java driver连接到Cassandra实例,并根据用户配置的信息生成INSERT CQL语句,然后发送到Cassandra。

对于用户配置Table、Column的信息,CassandraReader将其拼接为CQL语句发送到Cassandra。

3 功能说明#

3.1 配置样例#

配置一个从内存产生到Cassandra导入的作业:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 5,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "name",
                "type": "string"
              },
              {
                "value": "false",
                "type": "bool"
              },
              {
                "value": "1988-08-08 08:08:08",
                "type": "date"
              },
              {
                "value": "addr",
                "type": "bytes"
              },
              {
                "value": 1.234,
                "type": "double"
              },
              {
                "value": 12345678,
                "type": "long"
              },
              {
                "value": 2.345,
                "type": "double"
              },
              {
                "value": 3456789,
                "type": "long"
              },
              {
                "value": "4a0ef8c0-4d97-11d0-db82-ebecdb03ffa5",
                "type": "string"
              },
              {
                "value": "value",
                "type": "bytes"
              },
              {
                "value": "-838383838,37377373,-383883838,27272772,393993939,-38383883,83883838,-1350403181,817650816,1630642337,251398784,-622020148",
                "type": "string"
              }
            ],
            "sliceRecordCount": 10000000
          }
        },
        "writer": {
          "name": "cassandrawriter",
          "parameter": {
            "host": "localhost",
            "port": 9042,
            "useSSL": false,
            "keyspace": "stresscql",
            "table": "dst",
            "batchSize": 10,
            "column": [
              "name",
              "choice",
              "date",
              "address",
              "dbl",
              "lval",
              "fval",
              "ival",
              "uid",
              "value",
              "listval"
            ]
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
host Cassandra连接点的域名或ip,多个node之间用逗号分隔
port 9042 Cassandra端口
username 数据源的用户名
password 数据源指定用户名的密码
useSSL false 是否使用SSL连接
connectionsPerHost 8 客户端连接池配置:与服务器每个节点建多少个连接
maxPendingPerConnection 128 客户端连接池配置:每个连接最大请求数
keyspace 需要同步的表所在的keyspace
table 所选取的需要同步的表
column 所配置的表中需要同步的列集合,内容可以是列的名称或 writetime()。如果将列名配置为 writetime(),会将这一列的内容作为时间戳
consistancyLevel LOCAL_QUORUM 数据一致性级别, 可选 ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|LOCAL_ONE
batchSize 1 一次批量提交(UNLOGGED BATCH)的记录数大小(条数)
3.3 类型转换#

目前CassandraReader支持除counter和Custom类型之外的所有类型。

下面列出CassandraReader针对Cassandra类型转换列表:

Addax 内部类型 Cassandra 数据类型
Long int, tinyint, smallint,varint,bigint,time
Double float, double, decimal
String ascii,varchar, text,uuid,timeuuid,duration,list,map,set,tuple,udt,inet
Date date, timestamp
Boolean bool
Bytes blob

请注意:

目前不支持counter类型和custom类型。

4 约束限制#

batchSize#
  1. 不能超过65535

  2. batch中的内容大小受到服务器端 batch_size_fail_threshold_in_kb 的限制。

  3. 如果batch中的内容超过了 batch_size_warn_threshold_in_kb 的限制,会打出warn日志,但并不影响写入,忽略即可。

  4. 如果批量提交失败,会把这个批量的所有内容重新逐条写入一遍。

ClickHouseWriter#

ClickHouseWriter 插件实现了写入数据ClickHouse。在底层实现上,ClickHouseWriter 通过 JDBC 连接远程 ClickHouse 数据库,并执行相应的 insert into .... 语句将数据插入到ClickHouse库中。

示例#

以下示例我们演示从 clickhouse 中读取一张表的内容,并写入到相同表结构的另外一张表中,用来测试插件所支持的数据结构

表结构以数据#

假定要读取的表结构及数据如下:

CREATE TABLE ck_addax (
    c_int8 Int8,
    c_int16 Int16,
    c_int32 Int32,
    c_int64 Int64,
    c_uint8 UInt8,
    c_uint16 UInt16,
    c_uint32 UInt32,
    c_uint64 UInt64,
    c_float32 Float32,
    c_float64 Float64,
    c_decimal Decimal(38,10),
    c_string String,
    c_fixstr FixedString(36),
    c_uuid UUID,
    c_date Date,
    c_datetime DateTime('Asia/Chongqing'),
    c_datetime64 DateTime64(3, 'Asia/Chongqing'),
    c_enum Enum('hello' = 1, 'world'=2)
) ENGINE = MergeTree() ORDER BY (c_int8, c_int16) SETTINGS index_granularity = 8192;

insert into ck_addax values(
    127,
    -32768,
    2147483647,
    -9223372036854775808,
    255,
    65535,
    4294967295,
    18446744073709551615,
    0.999999999999,
    0.99999999999999999,
    1234567891234567891234567891.1234567891,
    'Hello String',
    '2c:16:db:a3:3a:4f',
    '5F042A36-5B0C-4F71-ADFD-4DF4FCA1B863',
    '2021-01-01',
    '2021-01-01 00:00:00',
    '2021-01-01 00:00:00',
    'hello'
);

要写入的表采取和读取表结构相同,其建表语句如下:

create table ck_addax_writer as ck_addax;

配置#

以下为配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "writer": {
          "name": "clickhousewriter",
          "parameter": {
            "username": "default",
            "column": [
              "*"
            ],
            "connection": [
              {
                "table": [
                  "ck_addax_writer"
                ],
                "jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/default"
              }
            ],
            "preSql": ["alter table @table delete where 1=1"]
          }
        },
        "reader": {
          "name": "clickhousereader",
          "parameter": {
            "username": "default",
            "column": [
              "*"
            ],
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:clickhouse://127.0.0.1:8123/"
                ],
                "table":["ck_addax"]
              }
            ]
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/clickhouse2clickhouse.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/clickhouse2clickhouse.json

参数说明#

配置项 是否必须 默认值 描述
jdbcUrl ClickHouse JDBC 连接信息 ,可按照官方规范填写连接附件控制信息。具体请参看ClickHouse官方文档
username 数据源的用户名
password 数据源指定用户名的密码
table 所选取的需要同步的表 ,当配置为多张表时,用户自己需保证多张表是同一schema结构
column 所配置的表中需要同步的列名集合, 使用JSON的数组描述字段信息。用户使用 * 代表默认使用所有列配置,例如 "['*']"
batchSize 2048 每次批量数据的条数

DbfFileWriter 插件文档#

1 快速介绍#

DbfFileWriter提供了向本地文件写入类dbf格式的一个或者多个表文件。DbfFileWriter服务的用户主要在于Addax开发、测试同学。

写入本地文件内容存放的是一张dbf表,例如dbf格式的文件信息。

2 功能说明#

2.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "bytes": -1,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "Addax",
                "type": "string"
              },
              {
                "value": 19880808,
                "type": "long"
              },
              {
                "value": "1989-06-04 00:00:00",
                "type": "date"
              },
              {
                "value": true,
                "type": "bool"
              },
              {
                "value": "中文测试",
                "type": "string"
              }
            ],
            "sliceRecordCount": 10
          }
        },
        "writer": {
          "name": "dbffilewriter",
          "parameter": {
            "column": [
              {
                "name": "col1",
                "type": "char",
                "length": 100
              },
              {
                "name": "col2",
                "type": "numeric",
                "length": 18,
                "scale": 0
              },
              {
                "name": "col3",
                "type": "date"
              },
              {
                "name": "col4",
                "type": "logical"
              },
              {
                "name": "col5",
                "type": "char",
                "length": 100
              }
            ],
            "fileName": "test.dbf",
            "path": "/tmp/out",
            "writeMode": "truncate",
            "encoding": "GBK"
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
path DBF文件目录,注意这里是文件夹,不是文件
column 类型默认为String 所配置的表中需要同步的列集合, 是 {type: value}{type: index} 的集合
fileName DbfFileWriter写入的文件名
writeMode DbfFileWriter写入前数据清理处理模式,支持 truncate, append, nonConflict 三种模式,详见如下
encoding UTF-8 DBF文件编码,比如 GBK, UTF-8
nullFormat \N 定义哪个字符串可以表示为null,
dateFormat 日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd"
writeMode#

DbfFileWriter写入前数据清理处理模式:

  • truncate,写入前清理目录下一fileName前缀的所有文件。

  • append,写入前不做任何处理,Addax DbfFileWriter直接使用filename写入,并保证文件名不冲突。

  • nonConflict,如果目录下有fileName前缀的文件,直接报错。

3.3 类型转换#

当前该插件支持写入的类型以及对应关系如下:

XBase Type XBase Symbol Java Type used in JavaDBF
Character C java.lang.String
Numeric N java.math.BigDecimal
Floating Point F java.math.BigDecimal
Logical L java.lang.Boolean
Date D java.util.Date

其中:

  • numeric 是指本地文件中使用数字类型表示形式,例如"19901219",整形小数位数为0。

  • logical 是指本地文件文本中使用Boolean的表示形式,例如"true"、"false"。

  • Date 是指本地文件文本中使用Date表示形式,例如"2014-12-31",Date是JAVA语言的DATE类型。

DorisWriter#

DorisWriter 插件实现了以流式方式加载数据到 Doris ,其实现上是通过访问 Doris http 连接(8030),然后通过 stream load 加载数据到数据中,相比 insert into 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Doris 是一个兼容 MySQL 协议的数据库后端,因此读取 Doris 可以使用 MySQLReader 进行访问。

示例#

假定要写入的表的建表语句如下:

CREATE DATABASE example_db;
CREATE TABLE example_db.table1
(
    siteid INT DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

下面配置一个从内存读取数据,然后写入到 doris 表的配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      }
    },
    "content": [
      {
        "writer": {
          "name": "doriswriter",
          "parameter": {
            "username": "test",
            "password": "123456",
            "batchSize": 1024,
            "connection": [
              {
                "table": "table1",
                "database": "example_db",
                "endpoint": "http://127.0.0.1:8030/"
              }
            ]
          }
        },
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "random": "1,500",
                "type": "long"
              },
              {
                "random": "1,127",
                "type": "long"
              },
              {
                "value": "this is a text",
                "type": "string"
              },
              {
                "random": "5,200",
                "type": "long"
              }
            ],
            "sliceRecordCount": 100
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/stream2doris.json

执行下面的命令

bin/addax.py job/stream2doris.json

输出类似如下:

2021-02-23 15:22:57.851 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-23 15:22:57.871 [main] INFO  Engine -
{
"content":[
{
"reader":{
    "parameter":{
            "column":[
                    {
                            "random":"1,500",
                            "type":"long"
                    },
                    {
                            "random":"1,127",
                            "type":"long"
                    },
                    {
                            "type":"string",
                            "value":"username"
                    }
            ],
            "sliceRecordCount":100
    },
    "name":"streamreader"
},
"writer":{
    "parameter":{
            "password":"*****",
            "batchSize":1024,
            "connection":[
                    {
                            "database":"example_db",
                            "endpoint":"http://127.0.0.1:8030/",
                            "table":"table1"
                    }
            ],
            "username":"test"
    },
    "name":"doriswriter"
}
}
],
"setting":{
"speed":{
"channel":2
}
}
}

2021-02-23 15:22:57.886 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-23 15:22:57.886 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-23 15:22:57.920 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2021-02-23 15:22:57.928 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.
2021-02-23 15:22:57.935 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-02-23 15:22:57.936 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2021-02-23 15:22:57.970 [0-0-1-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:22:57.970 [0-0-0-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load

2021-02-23 15:23:00.941 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-23 15:23:00.946 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-23 15:22:57
任务结束时刻                    : 2021-02-23 15:23:00
任务总计耗时                    :                  3s
任务平均流量                    :            1.56KB/s
记录写入速度                    :             66rec/s
读出记录总数                    :                 200
读写失败总数                    :                   0

参数说明#

配置项 是否必须 类型 默认值 描述
endpoint string Doris 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 |
username string HTTP 签名验证帐号
password string HTTP 签名验证密码
table string 所选取的需要同步的表名
column list 所配置的表中需要同步的列名集合,详细描述见rdbmswriter
batchSize int 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
column#

该插件中的 column 不是必须项,如果没有配置该项,或者配置为 ["*"] , 则按照 reader 插件获取的字段值进行顺序拼装。 否则可以按照如下方式指定需要插入的字段

{
  "column": ["siteid","citycode","username"]
}

ElasticSearchWriter 插件文档#

1 快速介绍#

数据导入elasticsearch的插件

2 实现原理#

使用 elasticsearch 的rest api接口, 批量把从reader读入的数据写入elasticsearch

3 功能说明#

3.1 配置样例#
job.json#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "random": "10,1000",
                "type": "long"
              },
              {
                "value": "1.1.1.1",
                "type": "string"
              },
              {
                "value": 19890604.0,
                "type": "double"
              },
              {
                "value": 19890604,
                "type": "long"
              },
              {
                "value": 19890604,
                "type": "long"
              },
              {
                "value": "hello world",
                "type": "string"
              },
              {
                "value": "long text",
                "type": "string"
              },
              {
                "value": "41.12,-71.34",
                "type": "string"
              },
              {
                "value": "2017-05-25 11:22:33",
                "type": "string"
              }
            ],
            "sliceRecordCount": 100
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://localhost:9200",
            "index": "test-1",
            "type": "default",
            "cleanup": true,
            "settings": {
              "index": {
                "number_of_shards": 1,
                "number_of_replicas": 0
              }
            },
            "discovery": false,
            "batchSize": 1000,
            "splitter": ",",
            "column": [
              {
                "name": "pk",
                "type": "id"
              },
              {
                "name": "col_ip",
                "type": "ip"
              },
              {
                "name": "col_double",
                "type": "double"
              },
              {
                "name": "col_long",
                "type": "long"
              },
              {
                "name": "col_integer",
                "type": "integer"
              },
              {
                "name": "col_keyword",
                "type": "keyword"
              },
              {
                "name": "col_text",
                "type": "text",
                "analyzer": "ik_max_word"
              },
              {
                "name": "col_geo_point",
                "type": "geo_point"
              },
              {
                "name": "col_date",
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss"
              },
              {
                "name": "col_nested1",
                "type": "nested"
              },
              {
                "name": "col_nested2",
                "type": "nested"
              },
              {
                "name": "col_object1",
                "type": "object"
              },
              {
                "name": "col_object2",
                "type": "object"
              },
              {
                "name": "col_integer_array",
                "type": "integer",
                "array": true
              },
              {
                "name": "col_geo_shape",
                "type": "geo_shape",
                "tree": "quadtree",
                "precision": "10m"
              }
            ]
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
endpoint ElasticSearch的连接地址
accessId http auth中的user, 默认为空
accessKey http auth中的password
index elasticsearch中的index名
type index名 lasticsearch中index的type名
cleanup false 是否删除原表
batchSize 1000 每次批量数据的条数
trySize 30 失败后重试的次数
timeout 600000 客户端超时时间,单位为毫秒(ms)
discovery false 启用节点发现将(轮询)并定期更新客户机中的服务器列表
compression true 否是开启http请求压缩
multiThread true 是否开启多线程http请求 |
ignoreWriteError false 写入错误时,是否重试,如果是 true 则表示一直重试,否则忽略该条数据
ignoreParseError true 解析数据格式错误时,是否继续写入
alias 数据导入完成后写入别名
aliasMode append 数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
settings 创建index时候的settings, 与elasticsearch官方相同
splitter , 如果插入数据是array,就使用指定分隔符
column elasticsearch所支持的字段类型,文档中给出的样例中包含了全部支持的字段类型
dynamic false 不使用addax的mappings,使用es自己的自动mappings

4 约束限制#

  • 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性

  • 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)

FtpWriter 插件文档#

1 快速介绍#

FtpWriter提供了向远程FTP文件写入CSV格式的一个或者多个文件,在底层实现上,FtpWriter将Addax传输协议下的数据转换为csv格式,并使用FTP相关的网络协议写出到远程FTP服务器。

写入FTP文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。

2 功能与限制#

FtpWriter实现了从Addax协议转为FTP文件功能,FTP文件本身是无结构化数据存储,FtpWriter如下几个方面约定:

  1. 支持且仅支持写入文本类型(不支持BLOB如视频数据)的文件,且要求文本中shema为一张二维表。

  2. 支持类CSV格式文件,自定义分隔符。

  3. 写出时不支持文本压缩。

  4. 支持多线程写入,每个线程写入不同子文件。

我们不能做到:

  1. 单个文件不能支持并发写入。

3 功能说明#

3.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {},
        "writer": {
          "name": "ftpwriter",
          "parameter": {
            "protocol": "sftp",
            "host": "***",
            "port": 22,
            "username": "xxx",
            "password": "xxx",
            "timeout": "60000",
            "connectPattern": "PASV",
            "path": "/tmp/data/",
            "fileName": "yixiao",
            "writeMode": "truncate|append|nonConflict",
            "fieldDelimiter": ",",
            "encoding": "UTF-8",
            "nullFormat": "null",
            "dateFormat": "yyyy-MM-dd",
            "fileFormat": "csv",
            "header": []
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
protocol ftp服务器协议,目前支持传输协议有ftp和sftp
host ftp服务器地址
port 22/21 若传输协议是sftp协议,默认值是22;若传输协议是标准ftp协议,默认值是21
timeout 60000 连接ftp服务器连接超时时间,单位毫秒(ms)
connectPattern PASV 连接模式,仅支持 PORT, PASV 模式。该参数只在传输协议是标准ftp协议时使用 |
username ftp服务器访问用户名
password ftp服务器访问密码
path 远程FTP文件系统的路径信息,FtpWriter会写入Path目录下属多个文件
fileName FtpWriter写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名
writeMode FtpWriter写入前数据清理处理模式,支持 truncate, append, nonConflict ,详见下文
fieldDelimiter , 描述:读取的字段分隔符
compress 文本压缩类型,暂不支持
encoding utf-8 读取文件的编码配置
dateFormat 日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd"
fileFormat text 文件写出的格式,包括csv, text两种,
header text写出时的表头,示例 ['id', 'name', 'age']
nullFormat \N 定义哪些字符串可以表示为null
maxTraversalLevel 100 允许遍历文件夹的最大层数
csvReaderConfig 读取CSV类型文件参数配置,Map类型。读取CSV类型文件使用的CsvReader进行读取,会有很多配置,不配置则使用默认值,详见下文
writeMod#

描述:FtpWriter写入前数据清理处理模式:

  1. truncate,写入前清理目录下一fileName前缀的所有文件。

  2. append,写入前不做任何处理,Addax FtpWriter直接使用filename写入,并保证文件名不冲突。

  3. nonConflict,如果目录下有fileName前缀的文件,直接报错。

3.3 类型转换#

FTP文件本身不提供数据类型,该类型是Addax FtpWriter定义:

Addax 内部类型 FTP文件 数据类型
Long Long -> 字符串序列化表示
Double Double -> 字符串序列化表示
String String -> 字符串序列化表示
Boolean Boolean -> 字符串序列化表示
Date Date -> 字符串序列化表示

其中:

  • Long 是指FTP文件文本中使用整形的字符串表示形式,例如"19901219"。

  • Double 是指FTP文件文本中使用Double的字符串表示形式,例如"3.1415"。

  • Boolean 是指FTP文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。

  • Date 是指FTP文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。

Hbase11XWriter 插件文档#

1 快速介绍#

HbaseWriter 插件实现了从向Hbase中写取数据。在底层实现上,HbaseWriter 通过 HBase 的 Java 客户端连接远程 HBase 服务,并通过 put 方式写入Hbase。

1.1支持功能#
  • 目前HbaseWriter支持源端多个字段拼接作为hbase 表的 rowkey,具体配置参考:rowkeyColumn配置;

  • 写入hbase的时间戳(版本)支持:用当前时间作为版本,指定源端列作为版本,指定一个时间 三种方式作为版本;

1.2 限制#
  1. 目前只支持源端为横表写入,不支持竖表(源端读出的为四元组: rowKey,family:qualifier,timestamp,value )模式的数据写入;

  2. 目前不支持写入hbase前清空表数据

2 实现原理#

简而言之,HbaseWriter 通过 HBase 的 Java 客户端,通过 HTable, Put等 API,将从上游Reader读取的数据写入HBase 你hbase11xwriter与hbase094xwriter的主要不同在于API的调用不同,

3 功能说明#

3.1 配置样例#

配置一个从本地写入hbase1.1.x的作业:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 5,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "txtfilereader",
          "parameter": {
            "path": "/tmp/normal.txt",
            "charset": "UTF-8",
            "column": [
              {
                "index": 0,
                "type": "String"
              },
              {
                "index": 1,
                "type": "string"
              },
              {
                "index": 2,
                "type": "string"
              },
              {
                "index": 3,
                "type": "string"
              },
              {
                "index": 4,
                "type": "string"
              },
              {
                "index": 5,
                "type": "string"
              },
              {
                "index": 6,
                "type": "string"
              }
            ],
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "hbase11xwriter",
          "parameter": {
            "hbaseConfig": {
              "hbase.zookeeper.quorum": "***"
            },
            "table": "writer",
            "mode": "normal",
            "rowkeyColumn": [
              {
                "index": 0,
                "type": "string"
              },
              {
                "index": -1,
                "type": "string",
                "value": "_"
              }
            ],
            "column": [
              {
                "index": 1,
                "name": "cf1:q1",
                "type": "string"
              },
              {
                "index": 2,
                "name": "cf1:q2",
                "type": "string"
              },
              {
                "index": 3,
                "name": "cf1:q3",
                "type": "string"
              },
              {
                "index": 4,
                "name": "cf2:q1",
                "type": "string"
              },
              {
                "index": 5,
                "name": "cf2:q2",
                "type": "string"
              },
              {
                "index": 6,
                "name": "cf2:q3",
                "type": "string"
              }
            ],
            "versionColumn": {
              "index": -1,
              "value": "123456789"
            },
            "encoding": "utf-8"
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
hbaseConfig 连接HBase集群需要的配置信息,JSON格式, hbase.zookeeper.quorum为必填项,其他 HBase client的配置为可选项
mode 写乳hbase的模式,目前仅支持 normal 模式
table 要读取的 hbase 表名(大小写敏感)
encoding UTF-8 编码方式,UTF-8 或是 GBK,用于对二进制存储的 HBase byte[] 转为 String 时的编码
column 要写入的hbase字段,normal 模式与multiVersionFixedColumn 模式下必填项, 详细说明见下文
rowkeyColumn 要写入的hbase的rowkey列, 详细说明见下文
versionColumn 指定写入hbase的时间戳。支持:当前时间、指定时间列,指定时间,三者选一,详见下文
nullMode skip 读取的null值时,如何处理, skip 表示不向hbase写这列;empty:写入 HConstants.EMPTY_BYTE_ARRAY,即new byte [0]
walFlag false 是否写WAL, true表示写入, false 表示不写
writeBufferSize 8M 设置HBae client的写buffer大小,单位字节
maxVersion 指定在多版本模式下的hbasereader读取的版本数,取值只能为-1或者大于1的数字,-1表示读取所有版本, multiVersionFixedColumn模式下必填
range 指定hbasereader读取的rowkey范围, 详见下文
scanCacheSize 256 Hbase client每次rpc从服务器端读取的行数
scanBatchSize 100 Hbase client每次rpc从服务器端读取的列数
column#

要写入的hbase字段。index:指定该列对应reader端column的索引,从0开始;name:指定hbase表中的列,必须为 列族:列名 的格式;type:指定写入数据类型,用于转换HBase byte[]。配置格式如下:

"column": [
{
"index": 1,
"name": "cf1:q1",
"type": "string"
},
{
"index": 2,
"name": "cf1:q2",
"type": "string"
}
]
rowkeyColumn#

要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量。配置格式如下:

"rowkeyColumn": [
{
"index": 0,
"type": "string"
},
{
"index": -1,
"type": "string",
"value": "_"
}
]
versionColumn#

指定写入hbase的时间戳。支持:当前时间、指定时间列,指定时间,三者选一。若不配置表示用当前时间。

index:指定对应reader端column的索引,从0开始,需保证能转换为long,若是Date类型, 会尝试用yyyy-MM-dd HH:mm:ssyyyy-MM-dd HH:mm:ss SSS去解析; 若为指定时间index为 -1

value:指定时间的值,long值。配置格式如下:

"versionColumn":{
"index": 1
}

或者

"versionColumn":{
"index": -1,
"value": 123456789
}
3.3 HBase支持的列类型#
  • BOOLEAN

  • SHORT

  • INT

  • LONG

  • FLOAT

  • DOUBLE

  • STRING

请注意: 除上述罗列字段类型外,其他类型均不支持

HBase11xsqlwriter 插件文档#

1. 快速介绍#

HBase11xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了单间的SQL表的数据导入方式。

在底层实现上,通过Phoenix的JDBC驱动,执行UPSERT语句向hbase写入数据。

1.1 支持的功能#

支持带索引的表的数据导入,可以同步更新所有的索引表

1.2 限制#
  • 仅支持1.x系列的hbase

  • 仅支持通过phoenix创建的表,不支持原生HBase表

  • 不支持带时间戳的数据导入

2. 实现原理#

通过Phoenix的JDBC驱动,执行UPSERT语句向表中批量写入数据。因为使用上层接口,所以,可以同步更新索引表。

3. 配置说明#

3.1 配置样例#
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "txtfilereader",
          "parameter": {
            "path": "/tmp/normal.txt",
            "charset": "UTF-8",
            "column": [
              {
                "index": 0,
                "type": "String"
              },
              {
                "index": 1,
                "type": "string"
              },
              {
                "index": 2,
                "type": "string"
              },
              {
                "index": 3,
                "type": "string"
              }
            ],
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "hbase11xsqlwriter",
          "parameter": {
            "batchSize": "256",
            "column": [
              "UID",
              "TS",
              "EVENTID",
              "CONTENT"
            ],
            "haveKerberos": "true",
            "kerberosPrincipal": "hive@EXAMPLE.COM",
            "kerberosKeytabFilePath": "/tmp/hive.headless.keytab",
            "hbaseConfig": {
              "hbase.zookeeper.quorum": "node1,node2,node3:2181",
              "zookeeper.znode.parent": "/hbase-secure"
            },
            "nullMode": "skip",
            "table": "TEST_TBL"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5,
        "bytes": -1
      }
    }
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
hbaseConfig hbase集群地址,zk为必填项,格式:ip1,ip2,ip3[:port],znode是可选的,默认值是 /hbase
table 要导入的表名,大小写敏感,通常phoenix表都是大写表名
column 列名,大小写敏感,通常phoenix的列名都是大写,数据类型无需填写,会自动获取列
batchSize 256 一次写入的最大记录数
nullMode skip 读取到的列值为null时,如何处理。支持 skip, empty,前者表示跳过该列,后者表示插入空值,数值类型为0,字符类型为null
haveKerberos false 是否启用Kerberos认证, true 表示启用, false 表示不启用
kerberosPrincipal null kerberos 凭证信息,仅当 havekerberos 启用后有效
kerberosKeytabFilePath null kerberos 凭证文件的绝对路径,仅当 havekerberos 启用后有效

注意:启用kerberos认证后,程序需要知道hbase-site.xml 所在的路径,一种办法是运行执行在环境变量 CLASSPATH 中增加该文件的所在路径。

另外一个解决办法是修改 addax.py 中的 CLASS_PATH 变量,增加 hbase-site.xml 的路径

HBase20xsqlwriter 插件文档#

1. 快速介绍#

HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了SQL方式直接向Phoenix表写入数据。

在底层实现上,通过Phoenix QueryServer的轻客户端驱动,执行UPSERT语句向Phoenix写入数据。

1.1 支持的功能#

支持带索引的表的数据导入,可以同步更新所有的索引表

1.2 限制#
  1. 要求版本为Phoenix5.x及HBase2.x

  2. 仅支持通过Phoenix QeuryServer导入数据,因此您Phoenix必须启动QueryServer服务才能使用本插件

  3. 不支持清空已有表数据

  4. 仅支持通过phoenix创建的表,不支持原生HBase表

  5. 不支持带时间戳的数据导入

2. 实现原理#

通过Phoenix轻客户端,连接Phoenix QueryServer服务,执行UPSERT语句向表中批量写入数据。因为使用上层接口,所以,可以同步更新索引表。

3. 配置说明#

3.1 配置样例#
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "txtfilereader",
          "parameter": {
            "path": "/tmp/normal.txt",
            "charset": "UTF-8",
            "column": [
              {
                "index": 0,
                "type": "String"
              },
              {
                "index": 1,
                "type": "string"
              },
              {
                "index": 2,
                "type": "string"
              },
              {
                "index": 3,
                "type": "string"
              }
            ],
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "hbase20xsqlwriter",
          "parameter": {
            "batchSize": "100",
            "column": [
              "UID",
              "TS",
              "EVENTID",
              "CONTENT"
            ],
            "queryServerAddress": "http://127.0.0.1:8765",
            "nullMode": "skip",
            "table": "TEST_TBL"
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5,
        "bytes": -1
      }
    }
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
queryServerAddress Phoenix QueryServer 地址, 该插件通过 PQS 进行连接
serialization PROTOBUF QueryServer使用的序列化协议
table 所要读取表名
schema 表所在的schema
batchSize 256 一次批量写入的最大行数
column 全部列 列名,大小写敏感,通常phoenix的列名都是大写, 数据类型无需填写,会自动获取列
nullMode skip 读取的null值时,如何处理, skip 表示不向hbase写这列;empty:写入 HConstants.EMPTY_BYTE_ARRAY,即new byte [0]

Addax HdfsWriter 插件文档#

1 快速介绍#

HdfsWriter提供向HDFS文件系统指定路径中写入 TEXTFileORCFile , PARQUET 文件, 文件内容可与 hive 中表关联。

2 功能与限制#

  1. 目前HdfsWriter仅支持 textfile ,orcfile, parquet 三种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表;

  2. 由于HDFS是文件系统,不存在schema的概念,因此不支持对部分列写入;

  3. 目前仅支持与以下Hive数据类型:

    • 数值型:TINYINT(txt或ORC), SMALLINT(txt或ORC), INT(orc或parquet), INTEGER(txt或ORC), BIGINT(txt或ORC), LONG(parquet), FLOAT(orc或parquet), DOUBLE(orc或parquet), DECIMAL(orc或TXT), DECIMAL(18.9) ( 只有PARQUET必须带精度)

    • 字符串类型:STRING(TXT/orc或parquet),VARCHAR(TXT/orc),CHAR(TXT/orC)

    • 布尔类型:BOOLEAN(TXT/orc或parquet)

    • 时间类型:DATE(TXT/orC),TIMESTAMP(TXT/orC)

目前不支持:binary、arrays、maps、structs、union类型

  1. 对于Hive分区表目前仅支持一次写入单个分区;

  2. 对于textfile需用户保证写入hdfs文件的分隔符与在Hive上创建表时的分隔符一致,从而实现写入hdfs数据与Hive表字段关联;

  3. HdfsWriter实现过程是:首先根据用户指定的path,在path目录下,创建点开头(.)的临时目录,创建规则:.path_<uuid>;然后将读取的文件写入这个临时目录;全部写入后再将这个临时目录下的文件移动到用户指定目录(在创建文件时保证文件名不重复); 最后删除临时目录。如果在中间过程发生网络中断等情况造成无法与hdfs建立连接,需要用户手动删除已经写入的文件和临时目录。

  4. 目前插件中Hive版本为3.1.1,Hadoop版本为 3.1.1, ,在Hadoop 2.7.x, Hadoop 3.1.x 和 Hive 2.x, Hive 3.1.x 测试环境中写入正常;其它版本理论上都支持,但用于生产之前建议进一步测试;

  5. 目前HdfsWriter支持Kerberos认证

3 功能说明#

3.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "Addax",
                "type": "string"
              },
              {
                "value": 19890604,
                "type": "long"
              },
              {
                "value": "1989-06-04 00:00:00",
                "type": "date"
              },
              {
                "value": true,
                "type": "bool"
              },
              {
                "value": "test",
                "type": "bytes"
              }
            ],
            "sliceRecordCount": 1000
          },
          "writer": {
            "name": "hdfswriter",
            "parameter": {
              "defaultFS": "hdfs://xxx:port",
              "fileType": "orc",
              "path": "/user/hive/warehouse/writerorc.db/orcfull",
              "fileName": "xxxx",
              "column": [
                {
                  "name": "col1",
                  "type": "string"
                },
                {
                  "name": "col2",
                  "type": "int"
                },
                {
                  "name": "col3",
                  "type": "string"
                },
                {
                  "name": "col4",
                  "type": "boolean"
                },
                {
                  "name": "col5",
                  "type": "string"
                }
              ],
              "writeMode": "overwrite",
              "fieldDelimiter": "\u0001",
              "compress": "SNAPPY"
            }
          }
        }
      }
    ]
  }
}
3.2 配置项说明#
配置项 是否必须 默认值
path
defaultFS
fileType
fileName
column 默认类型为 String
writeMode
fieldDelimiter ,
encoding utf-8
nullFormat
haveKerberos
kerberosKeytabFilePath
kerberosPrincipal
compress
hadoopConfig
path#

存储到Hadoop hdfs文件系统的路径信息,HdfsWriter 会根据并发配置在 Path 目录下写入多个文件。为与hive表关联,请填写hive表在hdfs上的存储路径。 例:Hive上设置的数据仓库的存储路径为:/user/hive/warehouse/ ,已建立数据库:test,表:hello; 则对应的存储路径为:/user/hive/warehouse/test.db/hello (如果建表时指定了location 属性,则依据该属性的路径)

defaultFS#

Hadoop hdfs文件系统namenode节点地址。格式:hdfs://ip:port ;例如:hdfs://127.0.0.1:9000 , 如果启用了HA,则为 servicename 模式,比如 hdfs://sandbox

fileType#

描述:文件的类型,目前只支持用户配置为

  • text 表示 Text file文件格式

  • orc 表示 OrcFile文件格式

  • parquet 表示 Parquet 文件格式

  • rc 表示 Rcfile 文件格式

  • seq 表示sequence file文件格式

  • csv 表示普通hdfs文件格式(逻辑二维表)

fileName#

HdfsWriter写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。

column#

写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型, 其中:name指定字段名,type指定字段类型。

用户可以指定 column 字段信息,配置如下:

{
  "column": [
    {
      "name": "userName",
      "type": "string"
    },
    {
      "name": "age",
      "type": "long"
    },
    {
      "name": "salary",
      "type": "decimal(8,2)"
    }
  ]
}

对于数据类型是 decimal 类型的,需要注意:

  1. 如果没有指定精度和小数位,则使用默认的 decimal(38,10) 表示

  2. 如果仅指定了精度但未指定小数位,则小数位用0表示,即 decimal(p,0)

  3. 如果都指定,则使用指定的规格,即 decimal(p,s)

writeMode#

描述:hdfswriter写入前数据清理处理模式:

  • append,写入前不做任何处理,Addax hdfswriter直接使用filename写入,并保证文件名不冲突。

  • overwrite 如果写入目录存在数据,则先删除,后写入

  • nonConflict,如果目录下有fileName前缀的文件,直接报错。

fieldDelimiter#

hdfswriter写入时的字段分隔符, 需要用户保证与创建的Hive表的字段分隔符一致,否则无法在Hive表中查到数据,如果写入的文件格式为 orc, parquet ,rcfile 等二进制格式,则该参数并不起作用

encoding#

写文件的编码配置,默认为 utf-8 慎重修改

compress#

描述:hdfs文件压缩类型,默认不填写意味着没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)

hadoopConfig#

hadoopConfig 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置

"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "nn01,nn02",
"dfs.namenode.rpc-address.testDfs.namenode1": "192.168.1.1",
"dfs.namenode.rpc-address.testDfs.namenode2": "192.168.1.2",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
haveKerberos#

是否有Kerberos认证,默认 false, 如果用户配置true,则配置项 kerberosKeytabFilePathkerberosPrincipal 为必填。

kerberosKeytabFilePath#

Kerberos认证 keytab文件路径,绝对路径

kerberosPrincipal#

描述:Kerberos认证Principal名,如 xxxx/hadoopclient@xxx.xxx

3.3 类型转换#

目前 HdfsWriter 支持大部分 Hive 类型,请注意检查你的类型。

下面列出 HdfsWriter 针对 Hive 数据类型转换列表:

Addax 内部类型 HIVE 数据类型
Long TINYINT,SMALLINT,INT,INTEGER,BIGINT
Double FLOAT,DOUBLE,DECIMAL
String STRING,VARCHAR,CHAR
Boolean BOOLEAN
Date DATE,TIMESTAMP
Bytes BINARY

InfluxDBWriter#

InfluxDBWriter 插件实现了将数据写入 InfluxDB 读取数据的功能。 底层实现上,是通过调用 InfluQL 语言接口,构建插入语句,然后进行数据插入。

示例#

以下示例用来演示该插件从内存读取数据并写入到指定表

创建需要的库#

通过以下命令来创建需要写入的库

# create database
influx --execute "CREATE DATABASE addax"
创建 job 文件#

创建 job/stream2kudu.json 文件,内容如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "random":"2001-01-01 00:00:00, 2016-07-07 23:59:59",
                "type":"date"
              },
              {
                "random": "1,1000",
                "type": "long"
              },
              {
                "random": "1,10",
                "type": "string"
              },
              {
                "random": "1000,50000",
                "type": "double"
              }
            ],
            "sliceRecordCount": 10
          }
        },
        "writer": {
          "name": "influxdbwriter",
          "parameter": {
            "connection": [
              {
                "endpoint": "http://localhost:8086",
                "database": "addax",
                "table": "addax_tbl"
              }
            ],
            "connTimeout": 15,
            "readTimeout": 20,
            "writeTimeout": 20,
            "username": "influx",
            "password": "influx123",
            "column": [
              {"name":"time", "type":"timestamp"},
              {"name":"user_id","type":"int"},
              {"name":"user_name", "type":"string"},
              {"name":"salary", "type":"double"}
            ],
            "preSql": ["delete from addax_tbl"],
            "batchSize": 1024,
            "retentionPolicy": {"name":"one_day_only", "duration": "1d", "replication":1}
          }
        }
      }
    ]
  }
}
运行#

执行下面的命令进行数据采集

bin/addax.py job/stream2kudu.json

参数说明#

配置项 是否必须 数据类型 默认值 描述
endpoint string InfluxDB 连接串
username string 数据源的用户名
password string 数据源指定用户名的密码
database string 数据源指定的数据库
table string 要写入的表(指标)
column list 所配置的表中需要同步的列名集合
connTimeout int 15 设置连接超时值,单位为秒
readTimeout int 20 设置读取超时值,单位为秒
writeTimeout int 20 设置写入超时值,单位为秒
preSql list 插入数据前执行的SQL语句
postSql list 数据插入完毕后需要执行的语句
retentionPolicy dict 设置数据库的 Retention Policy 策略
column#

InfluxDB 作为时许数据库,需要每条记录都有时间戳字段,因此这里会把 column 配置的第一个字段默认当作时间戳

retentionPolicy#

设定数据库的 Retention Policy 策略,依据给定的配置,在指定数据库上创建一条 Retention Policy 信息。 有关 Retention Policy 更详细的信息,可以参考官方文档

类型转换#

当前支持 InfluxDB 的基本类型

限制#

  1. 当前插件仅支持 1.x 版本,2.0 及以上并不支持

KuduWriter#

KuduWriter 插件实现了将数据写入到 kudu 的能力,当前是通过调用原生RPC接口来实现的。 后期希望通过 impala 接口实现,从而增加更多的功能。

示例#

以下示例演示了如何从内存读取样例数据并写入到 kudu 表中的。

表结构#

我们用 trino 工具连接到 kudu 服务,然后通过下面的 SQL 语句创建表

CREATE TABLE kudu.default.users (
  user_id int WITH (primary_key = true),
  user_name varchar,
  salary double
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 2
);
job 配置文件#

创建 job/stream2kudu.json 文件,内容如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "random": "1,1000",
                "type": "long"
              },
              {
                "random": "1,10",
                "type": "string"
              },
              {
                "random": "1000,50000",
                "type": "double"
              }
            ],
            "sliceRecordCount": 1000
          }
        },
        "writer": {
          "name": "kuduwriter",
          "parameter": {
            "masterAddress": "127.0.0.1:7051,127.0.0.1:7151,127.0.0.1:7251",
            "timeout": 60,
            "table": "users",
            "writeMode": "upsert",
            "column": [
              {"name":"user_id","type":"int8"},
              {"name":"user_name", "type":"string"},
              {"name":"salary", "type":"double"}
            ],
            "batchSize": 1024,
            "bufferSize": 2048,
            "skipFail": false,
            "encoding": "UTF-8"
          }
        }
      }
    ]
  }
}
运行#

执行下下面的命令进行数据采集

bin/addax.py job/stream2kudu.json

参数说明#

配置项 是否必须 类型 默认值 描述
masterAddress 必须 string Kudu Master集群RPC地址,多个地址用逗号(,)分隔
table 必须 string kudu 表名
writeMode string upsert 表数据写入模式,支持 upsert, insert 两者
timeout int 60 写入数据超时时间(秒)
column list 要写入的表字段及类型,如果配置为 "*" ,则会从目标表中读取所有字段
skipFail boolean false 是否跳过插入失败的记录,如果设置为true,则插件不会把插入失败的当作异常

已知限制#

  1. 暂时不支持 truncate table

MongoDBWriter 插件文档#

1 快速介绍#

MongoDBWriter 插件利用 MongoDB 的java客户端MongoClient进行MongoDB的写操作。最新版本的Mongo已经将DB锁的粒度从DB级别降低到document级别,配合上MongoDB强大的索引功能,基本可以满足数据源向MongoDB写入数据的需求,针对数据更新的需求,通过配置业务主键的方式也可以实现。

2 实现原理#

MongoDBWriter通过Datax框架获取Reader生成的数据,然后将Datax支持的类型通过逐一判断转换成MongoDB支持的类型。其中一个值得指出的点就是Datax本身不支持数组类型,但是MongoDB支持数组类型,并且数组类型的索引还是蛮强大的。为了使用MongoDB的数组类型,则可以通过参数的特殊配置,将字符串可以转换成MongoDB中的数组。类型转换之后,就可以依托于Datax框架并行的写入MongoDB。

3 功能说明#

3.1 配置样例#

该示例将流式数据写入到 MongoDB 表中

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "unique_id",
                "type": "string"
              },
              {
                "value": "sid",
                "type": "string"
              },
              {
                "value": "user_id",
                "type": "string"
              },
              {
                "value": "auction_id",
                "type": "string"
              },
              {
                "value": "content_type",
                "type": "string"
              },
              {
                "value": "pool_type",
                "type": "string"
              },
              {
                "value": "a1 a2 a3",
                "type": "string"
              },
              {
                "value": "c1 c2 c3",
                "type": "string"
              },
              {
                "value": "2020-09-06",
                "type": "string"
              },
              {
                "value": "tag1 tag2 tag3",
                "type": "string"
              },
              {
                "value": "property",
                "type": "string"
              },
              {
                "value": 1984,
                "type": "long"
              },
              {
                "value": 1900,
                "type": "long"
              },
              {
                "value": 75,
                "type": "long"
              }
            ],
            "sliceRecordCount": 10
          }
        },
        "writer": {
          "name": "mongodbwriter",
          "parameter": {
            "address": [
              "127.0.0.1:32768"
            ],
            "userName": "",
            "userPassword": "",
            "dbName": "tag_per_data",
            "collectionName": "tag_data",
            "column": [
              {
                "name": "unique_id",
                "type": "string"
              },
              {
                "name": "sid",
                "type": "string"
              },
              {
                "name": "user_id",
                "type": "string"
              },
              {
                "name": "auction_id",
                "type": "string"
              },
              {
                "name": "content_type",
                "type": "string"
              },
              {
                "name": "pool_type",
                "type": "string"
              },
              {
                "name": "frontcat_id",
                "type": "Array",
                "splitter": " "
              },
              {
                "name": "categoryid",
                "type": "Array",
                "splitter": " "
              },
              {
                "name": "gmt_create",
                "type": "string"
              },
              {
                "name": "taglist",
                "type": "Array",
                "splitter": " "
              },
              {
                "name": "property",
                "type": "string"
              },
              {
                "name": "scorea",
                "type": "int"
              },
              {
                "name": "scoreb",
                "type": "int"
              },
              {
                "name": "scorec",
                "type": "int"
              }
            ],
            "upsertInfo": {
              "isUpsert": "true",
              "upsertKey": "unique_id"
            }
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
address MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出
userName MongoDB的用户名
userPassword MongoDB的密码
collectionName MonogoDB的集合名
column MongoDB的文档列名
name Column的名字
type Column的类型
splitter 特殊分隔符,当且仅当要处理的字符串要用分隔符分隔为字符数组时,才使用这个参数,通过这个参数指定的分隔符,将字符串分隔存储到MongoDB的数组中
upsertInfo 指定了传输数据时更新的信息
isUpsert 当设置为true时,表示针对相同的upsertKey做更新操作
upsertKey upsertKey指定了没行记录的业务主键。用来做更新时使用

4 类型转换#

Addax 内部类型 MongoDB 数据类型
Long int, Long
Double double
String string, array
Date date
Boolean boolean
Bytes bytes

MysqlWriter#

MysqlWriter 插件实现了写入数据到 Mysql 主库的目的表的功能。在底层实现上, MysqlWriter 通过 JDBC 连接远程 Mysql 数据库,并执行相应的 insert into ... 或者 ( replace into ...) 的 sql 语句将数据写入 Mysql,内部会分批次提交入库,需要数据库本身采用 innodb 引擎。

MysqlWriter 面向ETL开发工程师,他们使用 MysqlWriter 从数仓导入数据到 Mysql。同时 MysqlWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

MysqlWriter 通过 Addax 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行) 或者 replace into...(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。出于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。

示例#

假定要写入的 MySQL 表建表语句如下:

create table test.addax_tbl 
(
col1 varchar(20) ,
col2 int(4),
col3 datetime,
col4 boolean,
col5 binary
) default charset utf8;

这里使用一份从内存产生到 Mysql 导入的数据。

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "Addax",
                "type": "string"
              },
              {
                "value": 19880808,
                "type": "long"
              },
              {
                "value": "1988-08-08 08:08:08",
                "type": "date"
              },
              {
                "value": true,
                "type": "bool"
              },
              {
                "value": "test",
                "type": "bytes"
              }
            ],
            "sliceRecordCount": 1000
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert",
            "username": "root",
            "password": "",
            "column": [
              "*"
            ],
            "session": [
              "set session sql_mode='ANSI'"
            ],
            "preSql": [
              "delete from @table"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useSSL=false",
                "table": [
                  "addax_tbl"
                ],
                "driver": "com.mysql.jdbc.Driver"
              }
            ]
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/stream2mysql.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/stream2mysql.json

参数说明#

配置项 是否必须 类型 默认值 描述
jdbcUrl list 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
driver string 自定义驱动类名,解决兼容性问题,详见下面描述
username string 数据源的用户名
password string 数据源指定用户名的密码
table list 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column list 所配置的表中需要同步的列名集合,详细描述见rdbmswriter
session list Addax在获取Mysql连接时,执行session指定的SQL语句,修改当前connection session属性
preSql list 数据写入钱先执行的sql语句,例如清除旧数据,如果 Sql 中有你需要操作到的表名称,可用 @table 表示
postSql list 数据写入完成后执行的sql语句,例如加上某一个时间戳
writeMode string insert 数据写入表的方式, insert 表示采用 insert into , replace表示采用replace into方式 update 表示采用 ON DUPLICATE KEY UPDATE 语句
batchSize int 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
driver#

当前 Addax 采用的 MySQL JDBC 驱动为 8.0 以上版本,驱动类名使用的 com.mysql.cj.jdbc.Driver,而不是 com.mysql.jdbc.Driver。 如果你需要采集的 MySQL 服务低于 5.6,需要使用到 Connector/J 5.1 驱动,则可以采取下面的步骤:

替换插件内置的驱动

rm -f plugin/writer/mysqlwriter/lib/mysql-connector-java-*.jar

拷贝老的驱动到插件目录

cp mysql-connector-java-5.1.48.jar plugin/writer/mysqlwriter/lib/

指定驱动类名称

在你的 json 文件类,配置 "driver": "com.mysql.jdbc.Driver"

类型转换#

目前 MysqlWriter 支持大部分 Mysql 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出 MysqlWriter 针对 Mysql 类型转换列表:

Addax 内部类型 Mysql 数据类型
Long int, tinyint, smallint, mediumint, int, bigint, year
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

bit类型目前是未定义类型转换

OracleWriter 插件文档#

1 快速介绍#

OracleWriter 插件实现了写入数据到 Oracle 主库的目的表的功能。在底层实现上, OracleWriter 通过 JDBC 连接远程 Oracle 数据库,并执行相应的 insert into ... 语句将数据写入 Oracle,内部会分批次提交入库。

OracleWriter 面向 ETL 开发工程师,他们使用 OracleWriter 从数仓导入数据到 Oracle。同时 OracleWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

2 实现原理#

OracleWriter 通过 Addax 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL语句

注意:

  1. 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into...的权限,是否需要其他权限,取决于你任务配置中在 preSqlpostSql 中指定的语句。

  2. OracleWriter 和 MysqlWriter不同,不支持配置 writeMode 参数。

3 功能说明#

3.1 配置样例#
  • 这里使用一份从内存产生到 Oracle 导入的数据。

{
"job": {
    "setting": {
        "speed": {
            "channel": 1,
          "bytes": -1
        }
    },
    "content": [{
        "reader": {
        "name": "streamreader",
        "parameter": {
            "column" : [
                {
                    "value": "Addax",
                    "type": "string"
                },
                {
                    "value": 19880808,
                    "type": "long"
                },
                {
                    "value": "1988-08-08 08:08:08",
                    "type": "date"
                },
                {
                    "value": true,
                    "type": "bool"
                },
                {
                    "value": "test",
                    "type": "bytes"
                }
            ],
            "sliceRecordCount": 1000
        }
        },
        "writer": {
            "name": "oraclewriter",
            "parameter": {
                "username": "root",
                "password": "root",
                "column": [
                    "id",
                    "name"
                ],
                "preSql": [
                    "delete from test"
                ],
                "connection": [{
                    "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]",
                    "table": ["test"]
                }]
            }
        }
    }]
}
}
3.2 参数说明#
配置项 是否必须 默认值 描述
jdbcUrl 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
username 数据源的用户名
password 数据源指定用户名的密码
writeMode insert 写入方式,支持 insert, update,详见下文
table 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column 所配置的表中需要同步的列名集合,详细描述rdbmswriter
preSql 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据,涉及到的表可用 @table表示
postSql 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳
batchSize 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
session 设置oracle连接时的session信息, 详见下文
writeMode#

默认情况下, 采取 insert into 语法写入 Oracle 表,如果你希望采取主键存在时更新,不存在则写入的方式,也就是 Oracle 的 merge into 语法, 可以使用 update 模式。假定表的主键为 id ,则 writeMode 配置方法如下:

"writeMode": "update(id)"

如果是联合唯一索引,则配置方法如下:

"writeMode": "update(col1, col2)"

注: update 模式在 3.1.6 版本首次增加,之前版本并不支持。

session#

描述:设置oracle连接时的session信息,格式示例如下:

"session":[
    "alter session set nls_date_format = 'dd.mm.yyyy hh24:mi:ss';"
    "alter session set NLS_LANG = 'AMERICAN';"
]
3.3 类型转换#

类似 OracleReader ,目前 OracleWriter 支持大部分 Oracle 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

下面列出 OracleWriter 针对 Oracle 类型转换列表:

Addax 内部类型 Oracle 数据类型
Long NUMBER,INTEGER,INT,SMALLINT
Double NUMERIC,DECIMAL,FLOAT,DOUBLE PRECISION,REAL
String LONG,CHAR,NCHAR,VARCHAR,VARCHAR2,NVARCHAR2,CLOB,NCLOB,CHARACTER,CHARACTER VARYING,CHAR VARYING,NATIONAL CHARACTER,NATIONAL CHAR,NATIONAL CHARACTER VARYING,NATIONAL CHAR VARYING,NCHAR VARYING
Date TIMESTAMP,DATE
Boolean bit, bool
Bytes BLOB,BFILE,RAW,LONG RAW

PostgresqlWriter#

PostgresqlWriter插件实现了写入数据到 PostgreSQL主库目的表的功能。在底层实现上,PostgresqlWriter通过JDBC连接远程 PostgreSQL 数据库,并执行相应的 insert into ... sql 语句将数据写入 PostgreSQL,内部会分批次提交入库。

PostgresqlWriter面向ETL开发工程师,他们使用PostgresqlWriter从数仓导入数据到PostgreSQL。同时 PostgresqlWriter亦可以作为数据迁移工具为DBA等用户提供服务。

示例#

以下配置演示从postgresql指定的表读取数据,并插入到具有相同表结构的另外一张表中,用来测试该插件所支持的数据类型。

表结构信息#

假定建表语句以及输入插入语句如下:

create table if not exists addax_tbl 
(
    c_bigint bigint,
    c_bit bit(3),
    c_bool boolean,
    c_byte bytea,
    c_char char(10),
    c_varchar varchar(20),
    c_date  date,
    c_double float8,
    c_int integer,
    c_json json,
    c_number decimal(8,3),
    c_real  real,
    c_small smallint,
    c_text  text,
    c_ts timestamp,
    c_uuid uuid,
    c_xml xml,
    c_money money,
    c_inet inet,
    c_cidr cidr,
    c_macaddr macaddr
);
insert into addax_tbl values(
    999988887777,
    B'101',
    TRUE,
    '\xDEADBEEF',
    'hello',
    'hello, world',
    '2021-01-04',
    999888.9972,
    9876542,
    '{"bar": "baz", "balance": 7.77, "active": false}'::json,
    12345.123,
    123.123,
    126,
    'this is a long text ',
    '2020-01-04 12:13:14',
    'A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A11'::uuid,
    '<foo>bar</foo>'::xml,
    '52093.89'::money,
    '192.168.1.1'::inet,
    '192.168.1/24'::cidr,
    '08002b:010203'::macaddr
);

创建需要插入的表的语句如下:

create table addax_tbl1 ( like addax_tbl);
任务配置#

以下是配置文件

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "postgresqlreader",
          "parameter": {
            "username": "pgtest",
            "password": "pgtest",
            "column": [
              "*"
            ],
            "connection": [
              {
                "table": [
                  "addax_tbl"
                ],
                "jdbcUrl": [
                  "jdbc:postgresql://localhost:5432/pgtest"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "column": [
              "*"
            ],
            "preSql": [
              "truncate table @table"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/pgtest",
                "table": [
                  "addax_tbl1"
                ]
              }
            ],
            "username": "pgtest",
            "password": "pgtest",
            "writeMode": "insert"
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/pg2pg.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/pg2pg.json

参数说明#

配置项 是否必须 默认值 描述
jdbcUrl 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
username 数据源的用户名
password 数据源指定用户名的密码
writeMode insert 写入模式,支持insert, update 详见如下
table 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column 所配置的表中需要同步的列名集合,详细描述rdbmswriter
preSql 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据,涉及到的表可用 @table表示
postSql 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳
batchSize 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
writeMode#

默认情况下, 采取 insert into 语法写入 postgresql 表,如果你希望采取主键存在时更新,不存在则写入的方式, 可以使用 update 模式。假定表的主键为 id ,则 writeMode 配置方法如下:

"writeMode": "update(id)"

如果是联合唯一索引,则配置方法如下:

"writeMode": "update(col1, col2)"

注: update 模式在 3.1.6 版本首次增加,之前版本并不支持。

类型转换#

目前 PostgresqlWriter支持大部分 PostgreSQL类型,但也存在部分没有支持的情况,请注意检查你的类型。

下面列出 PostgresqlWriter针对 PostgreSQL类型转换列表:

Addax 内部类型 PostgreSQL 数据类型
Long bigint, bigserial, integer, smallint, serial
Double double precision, money, numeric, real
String varchar, char, text, bit, inet,cidr,macaddr,uuid,xml,json
Date date, time, timestamp
Boolean bool
Bytes bytea

已知限制#

除以上列出的数据类型外,其他数据类型理论上均为转为字符串类型,但不确保准确性

RDBMS Writer#

RDBMSWriter 插件支持从传统 RDBMS 读取数据。这是一个通用关系数据库读取插件,可以通过注册数据库驱动等方式支持更多关系数据库读取。

同时 RDBMS Writer 又是其他关系型数据库读取插件的的基础类。以下读取插件均依赖该插件

  • Oracle Writer

  • MySQL Writer

  • PostgreSQL Writer

  • ClickHouse Writer

  • SQLServer Writer

配置说明#

配置一个写入RDBMS的作业。

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "Addax",
                "type": "string"
              },
              {
                "value": 19880808,
                "type": "long"
              },
              {
                "value": "1988-08-08 08:08:08",
                "type": "date"
              },
              {
                "value": true,
                "type": "bool"
              },
              {
                "value": "test",
                "type": "bytes"
              }
            ],
            "sliceRecordCount": 1000
          }
        },
        "writer": {
          "name": "rdbmswriter",
          "parameter": {
            "connection": [
              {
                "jdbcUrl": "jdbc:dm://ip:port/database",
                "driver": "",
                "table": [
                  "table"
                ]
              }
            ],
            "username": "username",
            "password": "password",
            "column": [
              "*"
            ],
            "preSql": [
              "delete from XXX;"
            ]
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 数据类型 默认值 描述
jdbcUrl string 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息 |
driver string 自定义驱动类名,解决兼容性问题,详见下面描述
username string 数据源的用户名
password string 数据源指定用户名的密码
table array 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column array 所配置的表中需要同步的列名集合,详细描述见后 |
preSql array 执行数据同步任务之前率先执行的sql语句,目前只允许执行一条SQL语句,例如清除旧数据,涉及到的表可用 @table表示
postSql array 执行数据同步任务之后执行的sql语句,目前只允许执行一条SQL语句,例如加上某一个时间戳
batchSize int 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
column#

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用 * 代表默认使用所有列配置,例如 ["*"]

支持列裁剪,即列可以挑选部分列进行导出。

支持列换序,即列可以不按照表schema信息进行导出。

支持常量配置,用户需要按照JSON格式:

["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]

  • id 为普通列名

  • `table` 为包含保留在的列名,

  • 1 为整形数字常量,

  • 'bazhen.csy'为字符串常量

  • null 为空指针,注意,这里的 null 必须以字符串形式出现,即用双引号引用

  • to_char(a + 1)为表达式,

  • 2.3 为浮点数,

  • true 为布尔值,同样的,这里的布尔值也必须用双引号引用

Column必须显示填写,不允许为空!

jdbcUrl#

jdbcUrl 配置除了配置必要的信息外,我们还可以在增加每种特定驱动的特定配置属性,这里特别提到我们可以利用配置属性对代理的支持从而实现通过代理访问数据库的功能。 比如对于 PrestoSQL 数据库的 JDBC 驱动而言,支持 socksProxy 参数,比如一个可能的 jdbcUrl

jdbc:presto://127.0.0.1:8080/hive?socksProxy=192.168.1.101:1081

大部分关系型数据库的 JDBC 驱动支持 socksProxyHost,socksProxyPort 参数来支持代理访问。也有一些特别的情况。

以下是各类数据库 JDBC 驱动所支持的代理类型以及配置方式

数据库 代理类型 代理配置 例子
MySQL socks socksProxyHost,socksProxyPort socksProxyHost=192.168.1.101&socksProxyPort=1081
Presto socks socksProxy socksProxy=192.168.1.101:1081
Presto http httpProxy httpProxy=192.168.1.101:3128
driver#

大部分情况下,一个数据库的JDBC驱动是固定的,但有些因为版本的不同,所建议的驱动类名不同,比如 MySQL。 新的 MySQL JDBC 驱动类型推荐使用 com.mysql.cj.jdbc.Driver 而不是以前的 com.mysql.jdbc.Drver。如果想要使用就的驱动名称,则可以配置 driver 配置项。

RedisWriter 插件文档#

1 快速介绍#

RedisWrite 提供了还原Redis dump命令的能力,并写入到目标Redis。支持redis cluster集群、proxy、以及单机

2 功能与限制#

  1. 支持写入redis cluster集群、proxy、以及单机。

我们暂时不能做到:

  1. 只支持写入Redis数据源。

3 功能说明#

3.1 配置样例#
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "redisreader",
          "parameter": {
            "connection": [
              {
                "uri": "file:///root/dump.rdb",
                "uri": "http://localhost/dump.rdb",
                "uri": "tcp://127.0.0.1:7001",
                "uri": "tcp://127.0.0.1:7002",
                "uri": "tcp://127.0.0.1:7003"
              }
            ]
          }
        },
        "writer": {
          "name": "rediswriter",
          "parameter": {
            "connection": [
              {
                "uri": "tcp://127.0.0.1:6379",
                "auth": "123456"
              }
            ],
            "redisCluster": false,
            "flushDB": false
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    }
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
uri redis链接,,如果是集群,单机/proxy/redis cluster集群只需填写一个地址即可, 程序会自动获取集群里的所有master
redisCluster false redis cluster集群请务必填写此项,否者无法定位slot槽。如果是proxy或单机忽略该项
flushDB false 迁移前格式化目标Redis
batchSize 1000 每次批量处理数量。如果key过大/小,可以相应的调整
timeout 60000 每次执行最大超时时间, 单位毫秒(ms)
include 要包含的 key, 支持正则表达式
exclude 要排除的 key,支持正则表达式

SqlServerWriter 插件文档#

1 快速介绍#

SqlServerWriter 插件实现了写入数据到 SqlServer 库的目的表的功能。在底层实现上, SqlServerWriter 通过 JDBC 连接远程 SqlServer 数据库,并执行相应的 insert into ... sql 语句将数据写入 SqlServer,内部会分批次提交入库。

SqlServerWriter 面向ETL开发工程师,他们使用 SqlServerWriter 从数仓导入数据到 SqlServer。同时 SqlServerWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

2 实现原理#

SqlServerWriter 通过 Addax 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL语句 insert into...(当主键/唯一性索引冲突时会写不进去冲突的行)

注意:

  1. 目的表所在数据库必须是主库才能写入数据;整个任务至少需具备 insert into... 的权限,是否需要其他权限,取决于你任务配置中在 preSqlpostSql 中指定的语句。

  2. SqlServerWriter和MysqlWriter不同,不支持配置writeMode参数。

3 功能说明#

3.1 配置样例#

这里使用一份从内存产生到 SqlServer 导入的数据。

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {},
        "writer": {
          "name": "sqlserverwriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "db_id",
              "db_type",
              "db_ip",
              "db_port",
              "db_role",
              "db_name",
              "db_username",
              "db_password",
              "db_modify_time",
              "db_modify_user",
              "db_description",
              "db_tddl_info"
            ],
            "connection": [
              {
                "table": [
                  "db_info_for_writer"
                ],
                "jdbcUrl": "jdbc:sqlserver://[HOST_NAME]:PORT;DatabaseName=[DATABASE_NAME]"
              }
            ],
            "preSql": [
              "delete from @table where db_id = -1;"
            ],
            "postSql": [
              "update @table set db_modify_time = now() where db_id = 1;"
            ]
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
jdbcUrl 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
username 数据源的用户名
password 数据源指定用户名的密码
table 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column 所配置的表中需要同步的列名集合,详细描述见rdbmswriter
splitPk 使用splitPk代表的字段进行数据分片,详细描述见rdbms
preSql 数据写入前先执行的sql语句
postSql 数据写入完成后,再执行的SQL语句
batchSize 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM
3.3 类型转换#

类似 SqlServerReader ,目前 SqlServerWriter 支持大部分 SqlServer 类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。

StreamWriter#

StreamWriter 是一个将数据写入内存的插件,一般用来将获取到的数据写到终端,用来调试读取插件的数据处理情况。

一个典型的 streamwriter 配置如下:

{
  "name": "streamwriter",
  "parameter": {
    "encoding": "UTF-8",
    "print": true
  }
}

上述配置会将获取的数据直接打印到终端。 该插件也支持将数据写入到文件,配置如下:

{
  "name": "streamwriter",
  "parameter": {
    "encoding": "UTF-8",
    "path": "/tmp/out",
    "fileName": "out.txt",
    "fieldDelimiter": ",",
    "recordNumBeforeSleep": "100",
    "sleepTime": "5"
  }
}

上述配置中:

fieldDelimiter 表示字段分隔符,默认为制表符(\t) recordNumBeforeSleep 表示获取多少条记录后,执行休眠,默认为0,表示不启用该功能

sleepTime 则表示休眠多长时间,单位为秒,默认为0,表示不启用该功能。

上述配置的含义是将数据写入到 /tmp/out/out.txt 文件, 每获取100条记录后,休眠5秒。

TDengineWriter#

TDengineWriter 插件实现了将数据写入到涛思公司的 TDengine 数据库系统。在底层实现上,TDengineWriter 通过JDBC JNI 驱动连接远程 TDengine 数据库, 并执行相应的sql语句将数据批量写入 TDengine 库中。

前置条件#

考虑到性能问题,该插件使用了 TDengine 的 JDBC-JNI 驱动, 该驱动直接调用客户端 API(libtaos.so 或 taos.dll)将写入和查询请求发送到taosd 实例。因此在使用之前需要配置好动态库链接文件。

首先将 plugin/writer/tdenginewriter/libs/libtaos.so.2.0.16.0 拷贝到 /usr/lib64 目录,然后执行下面的命令创建软链接

ln -sf /usr/lib64/libtaos.so.2.0.16.0 /usr/lib64/libtaos.so.1
ln -sf /usr/lib64/libtaos.so.1 /usr/lib64/libtaos.so

示例#

假定要写入的表如下:

create table test.addax_test (
    ts timestamp,
    name nchar(100),
    file_size int,
    file_date timestamp,
    flag_open bool,
    memo nchar(100)
);

以下是配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column" : [
              {
                "random":"2017-08-01 00:01:02,2020-01-01 12:13:14",
                "type": "date"
              },
              {
                "value": "Addax",
                "type": "string"
              },
              {
                "value": 19880808,
                "type": "long"
              },
              {
                "value": "1988-08-08 08:08:08",
                "type": "date"
              },
              {
                "value": true,
                "type": "bool"
              },
              {
                "value": "test",
                "type": "bytes"
              }
            ],
            "sliceRecordCount": 1000
          }
        },
        "writer": {
          "name": "tdenginewriter",
          "parameter": {
            "username": "root",
            "password": "taosdata",
            "column": ["ts", "name", "file_size", "file_date", "flag_open", "memo" ],
            "connection": [
              {
                "jdbcUrl": "jdbc:TAOS://127.0.0.1:6030/test",
                "table": [ "addax_test"]
              }
            ]
          }
        }
      }
    ]
  }
}

将上述配置文件保存为 job/stream2tdengine.json

执行采集命令#

执行以下命令进行数据采集

bin/addax.py job/tdengine2stream.json

命令输出类似如下:

2021-02-20 15:52:07.691 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-20 15:52:07.748 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"column":[
						{
							"random":"2017-08-01 00:01:02,2020-01-01 12:13:14",
							"type":"date"
						},
						{
							"type":"string",
							"value":"Addax"
						},
						{
							"type":"long",
							"value":19880808
						},
						{
							"type":"date",
							"value":"1988-08-08 08:08:08"
						},
						{
							"type":"bool",
							"value":true
						},
						{
							"type":"bytes",
							"value":"test"
						}
					],
					"sliceRecordCount":1000
				},
				"name":"streamreader"
			},
			"writer":{
				"parameter":{
					"password":"*****",
					"column":[
						"ts",
						"name",
						"file_size",
						"file_date",
						"flag_open",
						"memo"
					],
					"connection":[
						{
							"jdbcUrl":"jdbc:TAOS://127.0.0.1:6030/test",
							"table":[
								"addax_test"
							]
						}
					],
					"username":"root",
					"preSql":[]
				},
				"name":"tdenginewriter"
			}
		}
	],
	"setting":{
		"speed":{
			"bytes":-1,
			"channel":1
		}
	}
}

2021-02-20 15:52:07.786 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-20 15:52:07.787 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-20 15:52:07.789 [main] INFO  JobContainer - Set jobId = 0
java.library.path:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2021-02-20 15:52:08.048 [job-0] INFO  OriginalConfPretreatmentUtil - table:[addax_test] all columns:[ts,name,file_size,file_date,flag_open,memo].
2021-02-20 15:52:08.056 [job-0] INFO  OriginalConfPretreatmentUtil - Write data [
INSERT INTO %s (ts,name,file_size,file_date,flag_open,memo) VALUES(?,?,?,?,?,?)
], which jdbcUrl like:[jdbc:TAOS://127.0.0.1:6030/test]

2021-02-20 15:52:11.158 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-20 15:52:07
任务结束时刻                    : 2021-02-20 15:52:11
任务总计耗时                    :                  3s
任务平均流量                    :           11.07KB/s
记录写入速度                    :            333rec/s
读出记录总数                    :                1000
读写失败总数                    :                   0

参数说明#

配置项 是否必须 类型 默认值 描述
jdbcUrl list 对端数据库的JDBC连接信息,注意,这里的 TAOS 必须大写 |
username string 数据源的用户名
password string 数据源指定用户名的密码
table list 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column list 所配置的表中需要同步的列名集合,详细描述见rdbmswriter
preSql list 数据写入钱先执行的sql语句,例如清除旧数据,如果 Sql 中有你需要操作到的表名称,可用 @table 表示
postSql list 数据写入完成后执行的sql语句,例如加上某一个时间戳
batchSize int 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起

类型转换#

目前 TDenginereader 支持 TDengine 所有类型,具体如下

Addax 内部类型 TDengine 数据类型
Long SMALLINT, TINYINT, INT, BIGINT, TIMESTAMP
Double FLOAT, DOUBLE
String BINARY, NCHAR
Boolean BOOL

当前支持版本#

TDengine 2.0.16

注意事项#

  • TDengine JDBC-JNI 驱动和动态库版本要求一一匹配,因此如果你的数据版本并不是 2.0.16,则需要同时替换动态库和插件目录中的JDBC驱动

  • TDengine 的时序字段(timestamp)默认最小值为 1500000000000,即 2017-07-14 10:40:00.0,如果你写入的时许时间戳小于该值,则会报错

TxtFileWriter 插件文档#

1 快速介绍#

TxtFileWriter提供了向本地文件写入类CSV格式的一个或者多个表文件。TxtFileWriter服务的用户主要在于Addax开发、测试同学。

写入本地文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。

2 功能与限制#

TxtFileWriter实现了从Addax协议转为本地TXT文件功能,本地文件本身是无结构化数据存储,TxtFileWriter如下几个方面约定:

  1. 支持且仅支持写入 TXT的文件,且要求TXT中shema为一张二维表。

  2. 支持类CSV格式文件,自定义分隔符。

  3. 支持文本压缩,现有压缩格式为gzip、bzip2。

  4. 支持多线程写入,每个线程写入不同子文件。

  5. 文件支持滚动,当文件大于某个size值或者行数值,文件需要切换。 [暂不支持]

我们不能做到:

  1. 单个文件不能支持并发写入。

3 功能说明#

3.1 配置样例#
{
  "job": {
    "setting": {
      "speed": {
        "channel": 2,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "txtfilereader",
          "parameter": {
            "path": [
              "/tmp/data"
            ],
            "encoding": "UTF-8",
            "column": [
              {
                "index": 0,
                "type": "long"
              },
              {
                "index": 1,
                "type": "boolean"
              },
              {
                "index": 2,
                "type": "double"
              },
              {
                "index": 3,
                "type": "string"
              },
              {
                "index": 4,
                "type": "date",
                "format": "yyyy.MM.dd"
              }
            ],
            "fieldDelimiter": ","
          }
        },
        "writer": {
          "name": "txtfilewriter",
          "parameter": {
            "path": "/tmp/result",
            "fileName": "luohw",
            "writeMode": "truncate",
            "dateFormat": "yyyy-MM-dd"
          }
        }
      }
    ]
  }
}
3.2 参数说明#
配置项 是否必须 默认值 描述
path 本地文件系统的路径信息,写入Path目录下属多个文件
fileName 写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名
writeMode FtpWriter写入前数据清理处理模式,支持 truncate, append, nonConflict ,详见下文
column 默认String类型 读取字段列表,type指定源数据的类型,详见下文
fieldDelimiter , 描述:读取的字段分隔符
compress 文本压缩类型,默认不压缩,支持压缩类型为 zip、lzo、lzop、tgz、bzip2
encoding utf-8 读取文件的编码配置
nullFormat \N 定义哪些字符串可以表示为null
dateFormat 日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd"
fileFormat text 文件写出的格式,包括csv, text两种, 详见下文
header text写出时的表头,示例 ['id', 'name', 'age']
writeMode#

写入前数据清理处理模式:

  • truncate,写入前清理目录下一fileName前缀的所有文件。

  • append,写入前不做任何处理,直接使用filename写入,并保证文件名不冲突。

  • nonConflict,如果目录下有fileName前缀的文件,直接报错。

fileFormat#

文件写出的格式,包括 csv 和 text 两种,csv是严格的csv格式,如果待写数据包括列分隔符,则会按照csv的转义语法转义,转义符号为双引号 "; text格式是用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不做转义。

3.3 类型转换#

本地文件本身不提供数据类型,该类型是Addax TxtFileWriter定义:

Addax 内部类型 本地文件 数据类型
Long Long
Double Double
String String
Boolean Boolean
Date Date

其中:

  • Long 是指本地文件文本中使用整形的字符串表示形式,例如"19901219"。

  • Double 是指本地文件文本中使用Double的字符串表示形式,例如"3.1415"。

  • Boolean 是指本地文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。

  • Date 是指本地文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。

Transformer 插件文档#

Transformer定义#

在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。Addax包含了完成的E(Extract)、T(Transformer)、L(Load)支持。

运行模型#

_images/transform-arch.pngimage

UDF 函数#

dx_substr#

dx_substr(idx, pos, length) -> str

参数

  • idx: 字段编号,对应record中第几个字段

  • pos: 字段值的开始位置

  • length: 目标字段长度

返回: 从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

dx_pad#

dx_pad(idx, flag, length, chr)

参数

  • idx: 字段编号,对应record中第几个字段

  • flag: "l","r", 指示是在头进行填充,还是尾进行填充

  • length: 目标字段长度

  • chr: 需要填充的字符

返回: 如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符

举例:

dx_pad(1,"l","4","A"), 如果column 1 的值为 xyz=> Axyz, 值为 xyzzzzz => xyzz
dx_pad(1,"r","4","A"), 如果column 1 的值为 xyz=> xyzA, 值为 xyzzzzz => xyzz

dx_replace#

dx_replace(idx, pos, length, str) -> str

参数

  • idx: 字段编号,对应record中第几个字段

  • pos: 字段值的开始位置

  • length: 需要替换的字段长度

  • str: 要替换的字符串

返回: 从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)

举例:

dx_replace(1,"2","4","****")  column 1的value为“addaxTest”=>"da****est"
dx_replace(1,"5","10","****")  column 1的value为“addaxTest”=>"data****"

dx_filter#

dx_filter(idx, operator, expr) -> str

参数:

  • idx: 字段编号,对应record中第几个字段

  • operator: 运算符, 支持 like, not like, >, =, <, >=, !=, <=

  • expr: 正则表达式(java正则表达式)、值

  • str: 要替换的字符串

返回:

  • 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于 >, =, <都是对字段直接compare的结果.

  • likenot like 是将字段转换成字符类型,然后和目标正则表达式进行全匹配。

  • >, =, <, >=, !=, <= ,按照类型进行比较, 数值类型按大小比较,字符及布尔类型按照字典序比较

  • 如果目标字段为空(null),对于 = null 的过滤条件,将满足条件,被过滤。!=null 的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和 not like,字段为null满足条件,被过滤。

举例

dx_filter(1,"like","dataTest")  
dx_filter(1,">=","10")  

关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。

dx_groovy#

dx_groovy(code, package) -> record

参数

  • coee: 符合 groovy 编码要求的代码

  • package: extraPackage, 列表或者为空

返回

Record 数据类型

注意:

  • dx_groovy 只能调用一次。不能多次调用。

  • groovy code 中支持 java.lang, java.util 的包,可直接引用的对象有 record,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.clas- tringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。

  • groovy code 中,返回更新过的 Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。

  • 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil

举例:

groovy 实现的 subStr

String code="Column column = record.getColumn(1);\n"+
        " String oriValue = column.asString();\n"+
        " String newValue = oriValue.substring(0, 3);\n"+
        " record.setColumn(1, new StringColumn(newValue));\n"+
        " return record;";
        dx_groovy(record);

groovy 实现的Replace

String code2="Column column = record.getColumn(1);\n"+
        " String oriValue = column.asString();\n"+
        " String newValue = \"****\" + oriValue.substring(3, oriValue.length());\n"+
        " record.setColumn(1, new StringColumn(newValue));\n"+
        " return record;";

groovy 实现的Pad

String code3="Column column = record.getColumn(1);\n"+
        " String oriValue = column.asString();\n"+
        " String padString = \"12345\";\n"+
        " String finalPad = \"\";\n"+
        " int NeedLength = 8 - oriValue.length();\n"+
        "        while (NeedLength > 0) {\n"+
        "\n"+
        "            if (NeedLength >= padString.length()) {\n"+
        "                finalPad += padString;\n"+
        "                NeedLength -= padString.length();\n"+
        "            } else {\n"+
        "                finalPad += padString.substring(0, NeedLength);\n"+
        "                NeedLength = 0;\n"+
        "            }\n"+
        "        }\n"+
        " String newValue= finalPad + oriValue;\n"+
        " record.setColumn(1, new StringColumn(newValue));\n"+
        " return record;";

Job定义#

本例中,配置4个UDF。

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "My name is xxxx",
                "type": "string"
              },
              {
                "value": "password is Passw0rd",
                "type": "string"
              },
              {
                "value": 19890604,
                "type": "long"
              },
              {
                "value": "1989-06-04 00:00:00",
                "type": "date"
              },
              {
                "value": true,
                "type": "bool"
              },
              {
                "value": "test",
                "type": "bytes"
              },
              {
                "random": "0,10",
                "type": "long"
              }
            ],
            "sliceRecordCount": 10
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true,
            "encoding": "UTF-8"
          }
        },
        "transformer": [
          {
            "name": "dx_replace",
            "parameter": {
              "columnIndex": 0,
              "paras": [
                "11",
                "6",
                "wgzhao"
              ]
            }
          },
          {
            "name": "dx_substr",
            "parameter": {
              "columnIndex": 1,
              "paras": [
                "0",
                "12"
              ]
            }
          },
          {
            "name": "dx_map",
            "parameter": {
              "columnIndex": 2,
              "paras": [
                "^",
                "2"
              ]
            }
          },
          {
            "name": "dx_filter",
            "parameter": {
              "columnIndex": 6,
              "paras": [
                "<",
                "5"
              ]
            }
          }
        ]
      }
    ]
  }
}

计量和脏数据#

Transform过程涉及到数据的转换,可能造成数据的增加或减少,因此更加需要精确度量,包括:

  • Transform的入参Record条数、字节数。

  • Transform的出参Record条数、字节数。

  • Transform的脏数据Record条数、字节数。

  • 如果是多个Transform,某一个发生脏数据,将不会再进行后面的transform,直接统计为脏数据。

  • 目前只提供了所有Transform的计量(成功,失败,过滤的count,以及transform的消耗时间)。

涉及到运行过程的计量数据展现定义如下:

Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%

注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。

涉及到最终作业的计量数据展现定义如下:

任务启动时刻                    : 2015-03-10 17:34:21
任务结束时刻                    : 2015-03-10 17:34:31
任务总计耗时                    :                 10s
任务平均流量                    :            2.10MB/s
记录写入速度                    :         100000rec/s
转换输入总数                    :             1000000
转换输出总数                    :             1000000
读出记录总数                    :             1000000
同步失败总数                    :                   0

注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。

任务结果上报服务器功能 说明#

快速介绍#

主要用于将定时任务的结果上报给指定服务器

功能与限制#

  1. 支付http协议,JSON格式。

  2. 接口地址配置在 core.json 文件下的 core.dataXServer.address 下。

  3. 异步发送。

  4. 需要引入httpclient-4.5.2.jar,httpcore-4.4.5.jar,httpcore-nio-4.4.5.jar,httpasyncclient-4.1.2.jar 相关jar包

功能说明#

配置样例#

{
  "jobName": "test",
  "startTimeStamp": 1587971621,
  "endTimeStamp": 1587971621,
  "totalCosts": 10,
  "byteSpeedPerSecond": 33,
  "recordSpeedPerSecond": 1,
  "totalReadRecords": 6,
  "totalErrorRecords": 0
}

参数说明#

参数 描述 必选 默认值
jobName 任务名 jobName
startTimeStamp 任务执行的开始时间
endTimeStamp 任务执行的结束时间
totalCosts 任务总计耗时
byteSpeedPerSecond 任务平均流量
recordSpeedPerSecond 记录写入速度
totalReadRecords 读出记录总数 0
totalErrorRecords 读写失败总数 0

jobName 的设置规则如下:

  1. 在命令行通过传递 -P-DjobName=xxxx 方式指定,否则

  2. 配置文件的 writer.parameters.path 值按 / 分割后取第2,3列用点(.)拼接而成,其含义是为库名及表名,否则

  3. 否则设置为 jobName

Addax插件开发宝典#

本文面向Addax插件开发人员,尝试尽可能全面地阐述开发一个Addax插件所经过的历程,力求消除开发者的困惑,让插件开发变得简单。

Addax 为什么要使用插件机制#

从设计之初,Addax 就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,Addax 自然而然地采用了 框架 + 插件 的模式:

  • 插件只需关心数据的读取或者写入本身。

  • 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。

作为插件开发人员,则需要关注两个问题:

  1. 数据源本身的读写数据正确性。

  2. 如何与框架沟通、合理正确地使用框架。

插件视角看框架#

逻辑执行模型#

插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:

  • Job: Job 是Addax用以描述从一个源头到一个目的端的同步作业,是Addax数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。

  • Task: Task 是为最大化而把 Job 拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的 Job,拆分成1024个读 Task,用若干个并发执行。

  • TaskGroup: 描述的是一组 Task 集合。在同一个TaskGroupContainer执行下的Task集合称之为 TaskGroup

  • JobContainer: Job 执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker

  • TaskGroupContainer: TaskGroup 执行器,负责执行一组 Task 的工作单元,类似Yarn中的TaskTracker。

简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现 JobTask 两部分逻辑

物理执行模型#

框架为插件提供物理上的执行能力(线程)。Addax 框架有三种运行模式:

  • Standalone: 单进程运行,没有外部依赖。

  • Local: 单进程运行,统计信息、错误信息汇报到集中存储。

  • Distrubuted: 分布式多进程运行,依赖 Addax Service 服务。

当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。 当 JobContainerTaskGroupContainer 运行在同一个进程内时,就是单机模式(StandaloneLocal);当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。

编程接口#

那么,JobTask 的逻辑应是怎么对应到具体的代码中的?

首先,插件的入口类必须扩展 ReaderWriter 抽象类,并且实现分别实现 JobTask 两个内部抽象类,JobTask 的实现必须是 内部类 的形式,原因见 加载原理 一节。以Reader为例:

public class SomeReader
        extends Reader
{
    public static class Job
            extends Reader.Job
    {
        @Override
        public void init()
        {
        }

        @Override
        public void prepare()
        {
        }

        @Override
        public List<Configuration> split(int adviceNumber)
        {
            return null;
        }

        @Override
        public void post()
        {
        }

        @Override
        public void destroy()
        {
        }
    }

    public static class Task
            extends Reader.Task
    {

        @Override
        public void init()
        {
        }

        @Override
        public void prepare()
        {
        }

        @Override
        public void startRead(RecordSender recordSender)
        {
        }

        @Override
        public void post()
        {
        }

        @Override
        public void destroy()
        {
        }
    }
}

Job接口功能如下:

  • init: Job对象初始化工作,测试可以通过super.getPluginJobConf()获取与本插件相关的配置。读插件获得配置中reader部分,写插件获得writer部分。

  • prepare: 全局准备工作,比如odpswriter清空目标表。

  • split: 拆分Task。参数adviceNumber框架建议的拆分数,一般是运行时所配置的并发度。值返回的是Task的配置列表。

  • post: 全局的后置工作,比如mysqlwriter同步完影子表后的rename操作。

  • destroy: Job对象自身的销毁工作。

Task 接口功能如下:

  • init:Task对象的初始化。此时可以通过super.getPluginJobConf()获取与本Task相关的配置。这里的配置是Jobsplit方法返回的配置列表中的其中一个。

  • prepare:局部的准备工作。

  • startRead: 从数据源读数据,写入到RecordSender中。RecordSender会把数据写入连接Reader和Writer的缓存队列。

  • startWrite:从RecordReceiver中读取数据,写入目标数据源。RecordReceiver中的数据来自Reader和Writer之间的缓存队列。

  • post: 局部的后置工作。

  • destroy: Task象自身的销毁工作。

需要注意的是:

  • JobTask之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。

  • preparepostJobTask中都存在,插件需要根据实际情况确定在什么地方执行操作。

框架按照如下的顺序执行 JobTask 的接口:

_images/plugin_dev_guide_1.pngAddaxReaderWriter

上图中,黄色表示Job部分的执行阶段,蓝色表示Task部分的执行阶段,绿色表示框架执行阶段。

相关类关系如下:

_images/plugin_dev_guide_2.pngAddax

插件定义#

代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?

在每个插件的项目中,都有一个plugin.json文件,这个文件定义了插件的相关信息,包括入口类。例如:

{
  "name": "mysqlwriter",
  "class": "com.wgzhao.addax.plugin.writer.mysqlwriter.MysqlWriter",
  "description": "Use Jdbc connect to database, execute insert sql.",
  "developer": "wgzhao"
}
  • name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要

  • class: 入口类的全限定名称,框架通过反射穿件入口类的实例。十分重要

  • description: 描述信息。

  • developer: 开发人员。

打包发布#

Addax 使用 assembly 打包,打包命令如下:

mvn clean package -DskipTests assembly:single

Addax插件需要遵循统一的目录结构:

${DATAX_HOME}
|-- bin
|   `-- addax.py
|-- conf
|   |-- core.json
|   `-- logback.xml
|-- lib
|   `-- addax-core-dependencies.jar
`-- plugin
    |-- reader
    |   `-- mysqlreader
    |       |-- libs
    |       |   `-- mysql-reader-plugin-dependencies.jar
    |       |-- mysqlreader-0.0.1-SNAPSHOT.jar
    |       `-- plugin.json
    `-- writer
        |-- mysqlwriter
        |   |-- libs
        |   |   `-- mysql-writer-plugin-dependencies.jar
        |   |-- mysqlwriter-0.0.1-SNAPSHOT.jar
        |   `-- plugin.json
        |-- oceanbasewriter
        `-- odpswriter
  • ${DATAX_HOME}/bin: 可执行程序目录。

  • ${DATAX_HOME}/conf: 框架配置目录。

  • ${DATAX_HOME}/lib: 框架依赖库目录。

  • ${DATAX_HOME}/plugin: 插件目录。

插件目录分为readerwriter子目录,读写插件分别存放。插件目录规范如下:

  • ${PLUGIN_HOME}/libs: 插件的依赖库。

  • ${PLUGIN_HOME}/plugin-name-version.jar: 插件本身的jar。

  • ${PLUGIN_HOME}/plugin.json: 插件描述文件。

尽管框架加载插件时,会把 ${PLUGIN_HOME} 下所有的jar放到 classpath,但还是推荐依赖库的jar和插件本身的jar分开存放。

注意:

插件的目录名字必须和 plugin.json 中定义的插件名称一致。

配置文件#

Addax 使用 json 作为配置文件的格式。一个典型的 Addax 任务配置如下:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "odpsreader",
          "parameter": {
            "accessKey": "",
            "accessId": "",
            "column": [
              ""
            ],
            "isCompress": "",
            "odpsServer": "",
            "partition": [
              ""
            ],
            "project": "",
            "table": "",
            "tunnelServer": ""
          }
        },
        "writer": {
          "name": "oraclewriter",
          "parameter": {
            "username": "",
            "password": "",
            "column": [
              "*"
            ],
            "connection": [
              {
                "jdbcUrl": "",
                "table": [
                  ""
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

Addax 框架有 core.json 配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖 core.json 中的默认值。

配置中job.content.reader.parameter的value部分会传给Reader.Jobjob.content.writer.parameter的value部分会传给Writer.JobReader.JobWriter.Job可以通过super.getPluginJobConf()来获取。

Addax框架支持对特定的配置项进行RSA加密,例子中以*开头的项目便是加密后的值。 配置项加密解密过程对插件是透明,插件仍然以不带*的key来查询配置和操作配置项

如何设计配置参数#

配置文件的设计是插件开发的第一步!

任务配置中readerwriterparameter部分是插件的配置参数,插件的配置参数应当遵循以下原则:

  • 驼峰命名:所有配置项采用驼峰命名法,首字母小写,单词首字母大写。

  • 正交原则:配置项必须正交,功能没有重复,没有潜规则。

  • 富类型:合理使用json的类型,减少无谓的处理逻辑,减少出错的可能。

    • 使用正确的数据类型。比如,bool类型的值使用 true/false,而非 "yes"/"true"/0 等。

    • 合理使用集合类型,比如,用数组替代有分隔符的字符串。

  • 类似通用:遵守同一类型的插件的习惯,比如关系型数据库的 connection 参数都是如下结构:

    {
      "connection": [
        {
          "table": [
            "table_1",
            "table_2"
          ],
          "jdbcUrl": [
            "jdbc:mysql://127.0.0.1:3306/database_1",
            "jdbc:mysql://127.0.0.2:3306/database_1_slave"
          ]
        },
        {
          "table": [
            "table_3",
            "table_4"
          ],
          "jdbcUrl": [
            "jdbc:mysql://127.0.0.3:3306/database_2",
            "jdbc:mysql://127.0.0.4:3306/database_2_slave"
          ]
        }
      ]
    }
    

如何使用Configuration#

为了简化对json的操作,Addax提供了简单的DSL配合Configuration类使用。

Configuration提供了常见的get, 带类型get带默认值getset等读写配置项的操作,以及clone, toJSON等方法。配置项读写操作都需要传入一个path做为参数,这个path就是Addax定义的DSL。语法有两条:

  1. 子map用.key表示,path的第一个点省略。

  2. 数组元素用[index]表示。

比如操作如下json:

{
  "a": {
    "b": {
      "c": 2
    },
    "f": [
      1,
      2,
      {
        "g": true,
        "h": false
      },
      4
    ]
  },
  "x": 4
}

比如调用configuration.get(path)方法,当path为如下值的时候得到的结果为:

  • x4

  • a.b.c2

  • a.b.c.dnull

  • a.b.f[0]1

  • a.b.f[2].gtrue

注意,因为插件看到的配置只是整个配置的一部分。使用Configuration对象时,需要注意当前的根路径是什么。

更多Configuration的操作请参考ConfigurationTest.java

插件数据传输#

跟一般的 生产者-消费者 模式一样,Reader 插件和 Writer 插件之间也是通过 channel 来实现数据的传输的。channel 可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSenderchannel写入数据,通过RecordReceiverchannel读取数据。

channel 中的一条数据为一个 Record 的对象,Record 中可以放多个 Column 对象,这可以简单理解为数据库中的记录和列。

Record有如下方法:

public interface Record
{
    // 加入一个列,放在最后的位置
    void addColumn(Column column);

    // 在指定下标处放置一个列
    void setColumn(int i, final Column column);

    // 获取一个列
    Column getColumn(int i);

    // 转换为json String
    String toString();

    // 获取总列数
    int getColumnNumber();

    // 计算整条记录在内存中占用的字节数
    int getByteSize();
}

因为Record是一个接口,Reader插件首先调用RecordSender.createRecord()创建一个Record实例,然后把Column一个个添加到Record中。

Writer 插件调用RecordReceiver.getFromReader()方法获取Record,然后把Column遍历出来,写入目标存储中。当Reader尚未退出,传输还在进行时,如果暂时没有数据RecordReceiver.getFromReader()方法会阻塞直到有数据。如果传输已经结束,会返回nullWriter 插件可以据此判断是否结束startWrite方法。

Column 的构造和操作,我们在《类型转换》一节介绍。

类型转换#

为了规范源端和目的端类型转换操作,保证数据不失真,Addax支持六种内部数据类型:

  • Long:定点数(Int、Short、Long、BigInteger等)。

  • Double:浮点数(Float、Double、BigDecimal(无限精度)等)。

  • String:字符串类型,底层不限长,使用通用字符集(Unicode)。

  • Date:日期类型。

  • Bool:布尔值。

  • Bytes:二进制,可以存放诸如MP3等非结构化数据。

对应地,有DateColumnLongColumnDoubleColumnBytesColumnStringColumnBoolColumn六种Column的实现。

Column除了提供数据相关的方法外,还提供一系列以as开头的数据类型转换转换方法。

_images/plugin_dev_guide_3.pngColumns

Addax的内部类型在实现上会选用不同的java类型:

内部类型 实现类型 备注
Date java.util.Date
Long java.math.BigInteger 使用无限精度的大整数,保证不失真
Double java.lang.String 用String表示,保证不失真
Bytes byte[]
String java.lang.String
Bool java.lang.Boolean

类型之间相互转换的关系如下:

from/to Date Long Double Bytes String Bool
Date - 使用毫秒时间戳 不支持 不支持 使用系统配置的date/time/datetime格式转换 不支持
Long 作为毫秒时间戳构造Date - BigInteger转为BigDecimal,然后BigDecimal.doubleValue() 不支持 BigInteger.toString() 0为false,否则true
Double 不支持 内部String构造BigDecimal,然后BigDecimal.longValue() - 不支持 直接返回内部String
Bytes 不支持 不支持 不支持 - 按照common.column.encoding配置的编码转换为String,默认utf-8 不支持
String 按照配置的date/time/datetime/extra格式解析 用String构造BigDecimal,然后取longValue() 用String构造BigDecimal,然后取doubleValue(),会正确处理NaN/Infinity/-Infinity 按照common.column.encoding配置的编码转换为byte[],默认utf-8 - "true"为true, "false"为false,大小写不敏感。其他字符串不支持
Bool 不支持 true1L,否则0L true1.0,否则0.0 不支持 -

脏数据处理#

什么是脏数据#

目前主要有三类脏数据:

  1. Reader读到不支持的类型、不合法的值。

  2. 不支持的类型转换,比如:Bytes转换为Date

  3. 写入目标端失败,比如:写mysql整型长度超长。

如何处理脏数据#

Reader.TaskWriter.Task中,功过AbstractTaskPlugin.getPluginCollector()可以拿到一个TaskPluginCollector,它提供了一系列collectDirtyRecord的方法。当脏数据出现时,只需要调用合适的collectDirtyRecord方法,把被认为是脏数据的Record传入即可。

用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好。

加载原理#

  1. 框架扫描plugin/readerplugin/writer目录,加载每个插件的plugin.json文件。

  2. plugin.json文件中name为key,索引所有的插件配置。如果发现重名的插件,框架会异常退出。

  3. 用户在插件中在reader/writer配置的name字段指定插件名字。框架根据插件的类型(reader/writer)和插件名称去插件的路径下扫描所有的jar,加入classpath

  4. 根据插件配置中定义的入口类,框架通过反射实例化对应的JobTask对象。