原文件数据格,每一行都是一条es数据
{"_index":"logstash-tencent.com_main","_type":"_doc","_id":"gMkeAogBQ9Wx5CLJRkyu","_score":1,"_source":{"email_address":"208xxxxxxxx@qq.com","phone":"185xxxxxxxx","qq":"208xxxxxxxx","message":"208xxxxxxxx@qq.com~~~185xxxxxxxx~~~208xxxxxxxx","path":"/home/data/sql_outputs/argosy/new/tencent.com.csv","host":"storage","@version":"1","@timestamp":"2023-05-09T22:04:23Z"}}
{"_index":"logstash-tencent.com_main","_type":"_doc","_id":"gckeAogBQ9Wx5CLJRkyu","_score":1,"_source":{"email_address":"1332xxxxxx@qq.com","phone":"157xxxxx505","qq":"1332xxxxxx","message":"1332xxxxxx@qq.com~~~157xxxxx505~~~1332xxxxxx","path":"/home/data/sql_outputs/argosy/new/tencent.com.csv","host":"storage","@version":"1","@timestamp":"2023-05-09T22:04:23Z"}}
{"_index":"logstash-tencent.com_main","_type":"_doc","_id":"gskeAogBQ9Wx5CLJRkyu","_score":1,"_source":{"email_address":"146xxxxxxxx@qq.com","phone":"183xxxxxxxx","qq":"146xxxxxxxx","message":"146xxxxxxxx@qq.com~~~183xxxxxxxx~~~146xxxxxxxx","path":"/home/data/sql_outputs/argosy/new/tencent.com.csv","host":"storage","@version":"1","@timestamp":"2023-05-09T22:04:23Z"}}
创建es索引
POST http://127.0.0.1:9200/logstash-tencent.com_main
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"phone": {
"type": "keyword"
},
"qq": {
"type": "keyword"
},
"@timestamp": {
"type": "date"
}
}
}
}
将原文件每60w行数据拆分成一个文件,文件以ori_tencent_开头
split -l 600000 tencent.com1289489189489189489148989314893189478 ori_tencent_
清洗数据然后导入es
for file in ori_tencent_*; do
# 提取文件名(不包括扩展名)
output_file="es_bulk_${file%}.json"
jq -c 'del(._type,._score,._source.path,._source.host,._source.message,._source.email_address,._source."@version")' "$file" | awk -F '"_index":"|","_id":"|","_source"' '{
json_start = index($0, "{\"_index\":");
json_end = length($0);
data_start = index($0, "{\"phone");
json_data = substr($0, data_start, json_end - data_start);
print "{\"index\":{\"_index\":\"" $2 "\", \"_id\":\"" $3 "\"}}\n" json_data }' > "$output_file"
echo "Processed $file into $output_file"
#导入es
curl -o NUL -H "Content-Type: application/json" -XPOST "localhost:9200/logstash-tencent.com_main/_bulk?refresh" --data-binary @"$output_file"
done
相关解释:
-
for file in ori_tencent_*; do
:遍历当前目录中所有ori_tencent_
开头的文件。 -
output_file="es_bulk_${file%}.json"
:- 这个命令将输出文件名加上
es_bulk_
前缀,用.json
文件作为后缀。 - 例如,原文件
tencent_aa
会生成es_bulk_tencent_aa.json
文件。
- 这个命令将输出文件名加上
-
awk,jq
命令:- 对每行数据先通过
jq
过滤清洗数据,只保留qq
和phone
字段,通过awk
命令,将每行数据转换为 Elasticsearch_bulk
数据格式,并输出到相应的_bulk
文件中。 -F
:设置分隔符为多个字段(如"_index":"
,"_id":"
,"_score"
),确保正确解析字段。$2
和$3
:根据分隔符提取_index
和_id
的值。
- 对每行数据先通过
-
echo "Processed $file into $output_file"
:- 打印一条消息,确认哪个文件被处理并生成了对应的
_bulk
文件。
- 打印一条消息,确认哪个文件被处理并生成了对应的
-
最终通过curl命令读取输出的json文件数据导入es
评论区