ElasticSearchReader#

ElasticSearchReader 插件实现了从 Elasticsearch 读取索引的功能, 它通过 Elasticsearch 提供的 Rest API (默认端口9200),执行指定的查询语句批量获取数据

示例#

假定要获取的索引内容如下

{
"took": 14,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "test-1",
"_type": "default",
"_id": "38",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
},
{
"_index": "test-1",
"_type": "default",
"_id": "103",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
}
]
}
}

配置一个从 Elasticsearch 读取数据并打印到终端的任务

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "elasticsearchreader",
          "parameter": {
            "endpoint": "http://127.0.0.1:9200",
            "accessId":"",
            "accesskey":"",
            "index": "test-1",
            "type": "default",
            "searchType": "dfs_query_then_fetch",
            "headers": {},
            "scroll": "3m",
            "search": [
              {
                "query": {
                  "match": {
                    "col_ip": "1.1.1.1"
                  }
                },
                "aggregations": {
                  "top_10_states": {
                    "terms": {
                      "field": "col_date",
                      "size": 10
                    }
                  }
                }
              }
            ],
            "column": ["col_ip", "col_double", "col_long","col_integer",
              "col_keyword", "col_text","col_geo_point","col_date"
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true,
            "encoding": "UTF-8"
          }
        }
      }
    ]
  }
}

将上述内容保存为 job/es2stream.json

执行下面的命令进行采集

bin/addax.py job/es2stream.json

其输出结果类似如下(输出记录数有删减)

2021-02-19 13:38:15.860 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO  Engine -
{
	"content":[
		{
			"reader":{
				"parameter":{
					"accessId":"",
					"headers":{},
					"endpoint":"http://127.0.0.1:9200",
					"search":[
                      {
                        "query": {
                          "match": {
                            "col_ip": "1.1.1.1"
                          }
                        },
                        "aggregations": {
                          "top_10_states": {
                            "terms": {
                              "field": "col_date",
                              "size": 10
                            }
                          }
                        }
                      }
					],
					"accesskey":"*****",
					"searchType":"dfs_query_then_fetch",
					"scroll":"3m",
					"column":[
						"col_ip",
						"col_double",
						"col_long",
						"col_integer",
						"col_keyword",
						"col_text",
						"col_geo_point",
						"col_date"
					],
					"index":"test-1",
					"type":"default"
				},
				"name":"elasticsearchreader"
			},
			"writer":{
				"parameter":{
					"print":true,
					"encoding":"UTF-8"
				},
				"name":"streamwriter"
			}
		}
	],
	"setting":{
		"errorLimit":{
			"record":0,
			"percentage":0.02
		},
		"speed":{
			"byte":-1,
			"channel":1
		}
	}
}

2021-02-19 13:38:15.934 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO  JobContainer - Set jobId = 0

2017-05-25T11:22:33.000+08:00	19890604	hello world	1.1.1.1	long text	19890604	19890604	41.12,-71.34
2017-05-25T11:22:33.000+08:00	19890604	hello world	1.1.1.1	long text	19890604	19890604	41.12,-71.34

2021-02-19 13:38:19.845 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO  JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO  JobContainer - Addax Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO  StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-19 13:38:15
任务结束时刻                    : 2021-02-19 13:38:19
任务总计耗时                    :                  3s
任务平均流量                    :            2.84KB/s
记录写入速度                    :             31rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

参数说明#

配置项 是否必须 类型 默认值 描述
endpoint string ElasticSearch的连接地址
accessId string "" http auth中的user
accessKey string "" http auth中的password
index string elasticsearch中的index名
type string index名 elasticsearch中index的type名
search list [] json格式api搜索数据体
column list 需要读取的字段
timeout int 60 客户端超时时间(单位:秒)
discovery boolean false 启用节点发现将(轮询)并定期更新客户机中的服务器列表
compression boolean true http请求,开启压缩
multiThread boolean true http请求,是否有多线程
searchType string dfs_query_then_fetch 搜索类型
headers map {} http请求头
scroll string "" 滚动分页配置

searchType#

searchType 目前支持以下几种:

  • dfs_query_then_fetch

  • query_then_fetch

  • count

  • scan