数据导入elasticsearch的插件
使用elasticsearch的rest api接口, 批量把从reader读入的数据写入elasticsearch
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
...
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://xxx:9999",
"accessId": "xxxx",
"accessKey": "xxxx",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {"index" :{"number_of_shards": 1, "number_of_replicas": 0}},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{"name": "pk", "type": "id"},
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text", "analyzer": "ik_max_word"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
{ "name": "col_nested1", "type": "nested" },
{ "name": "col_nested2", "type": "nested" },
{ "name": "col_object1", "type": "object" },
{ "name": "col_object2", "type": "object" },
{ "name": "col_integer_array", "type":"integer", "array":true},
{ "name": "col_geo_shape", "type":"geo_shape", "tree": "quadtree", "precision": "10m"}
]
}
}
}
]
}
}
endpoint
accessId
accessKey
index
type
cleanup
batchSize
trySize
timeout
discovery
compression
multiThread
ignoreWriteError
ignoreParseError
alias
aliasMode
settings
splitter
column
dynamic
{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "hello world", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25", "type": "string"},
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date"}
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
通道数 | 批量提交行数 | DataX速度(Rec/s) | DataX流量(MB/s) |
---|---|---|---|
4 | 256 | 11013 | 0.828 |
4 | 1024 | 19417 | 1.43 |
4 | 4096 | 23923 | 1.76 |
4 | 8172 | 24449 | 1.80 |
8 | 256 | 21459 | 1.58 |
8 | 1024 | 37037 | 2.72 |
8 | 4096 | 45454 | 3.34 |
8 | 8172 | 45871 | 3.37 |
16 | 1024 | 67567 | 4.96 |
16 | 4096 | 78125 | 5.74 |
16 | 8172 | 77519 | 5.69 |
32 | 1024 | 94339 | 6.93 |
32 | 4096 | 96153 | 7.06 |
64 | 1024 | 91743 | 6.74 |