KuduWriter#

KuduWriter 插件实现了将数据写入到 kudu 的能力,当前是通过调用原生RPC接口来实现的。 后期希望通过 impala 接口实现,从而增加更多的功能。

示例#

以下示例演示了如何从内存读取样例数据并写入到 kudu 表中的。

表结构#

我们用 trino 工具连接到 kudu 服务,然后通过下面的 SQL 语句创建表

CREATE TABLE kudu.default.users (
  user_id int WITH (primary_key = true),
  user_name varchar,
  salary double
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 2
);

job 配置文件#

创建 job/stream2kudu.json 文件,内容如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "random": "1,1000",
                "type": "long"
              },
              {
                "random": "1,10",
                "type": "string"
              },
              {
                "random": "1000,50000",
                "type": "double"
              }
            ],
            "sliceRecordCount": 1000
          }
        },
        "writer": {
          "name": "kuduwriter",
          "parameter": {
            "masterAddress": "127.0.0.1:7051,127.0.0.1:7151,127.0.0.1:7251",
            "timeout": 60,
            "table": "users",
            "writeMode": "upsert",
            "column": [
              {"name":"user_id","type":"int8"},
              {"name":"user_name", "type":"string"},
              {"name":"salary", "type":"double"}
            ],
            "batchSize": 1024,
            "bufferSize": 2048,
            "skipFail": false,
            "encoding": "UTF-8"
          }
        }
      }
    ]
  }
}

运行#

执行下下面的命令进行数据采集

bin/addax.py job/stream2kudu.json

参数说明#

配置项 是否必须 类型 默认值 描述
masterAddress 必须 string Kudu Master集群RPC地址,多个地址用逗号(,)分隔
table 必须 string kudu 表名
writeMode string upsert 表数据写入模式,支持 upsert, insert 两者
timeout int 60 写入数据超时时间(秒)
column list 要写入的表字段及类型,如果配置为 "*" ,则会从目标表中读取所有字段
skipFail boolean false 是否跳过插入失败的记录,如果设置为true,则插件不会把插入失败的当作异常

已知限制#

  1. 暂时不支持 truncate table