logstash运行时默认就有一个main pipline
如果我们 在配置文件中 ,写 if else ,处理多个文件或者input , 如果业务逻辑比较多,那么程序的可读性比较差。所以我们用pipeline 来执行一个,然后处理多个input。
数据准备:
ES集群
/etc/logstash/pipelines.yml
每一个启动任务 配置单独的id
- pipeline.id: baimei-bulk
path.config: "/root/config/beats_to_logstash.conf"
- pipeline.id : baimei-txt
path.config: "/root/config/txt.conf"
- pipeline.id : baimei-json
path.config: "/root/config/json.conf"
启动: 直接输入 logstash 就可以了。
logstash
如果不用pipeline 还可以用 配置 路径的方式:
注意这个地方, 会报错: 找不到 configer 配置文件。
需要操作:
mkdir /usr/share/logstash/config
ln -sv /etc/logstash/pipelines.yml /usr/share/logstash/config/
接下来我们操作按一个案例:
准备数据 json 数据:txt 数据:bulk 数据
用logstash 处理3中不同的数据, 然后用pipline 来运行。
1.json 数据:
{ "ip_addr": "202.189.3.253" ,"title":"果宾斯泰国进口金枕榴莲 新鲜水果 时令生鲜 金枕榴莲3-4斤(精品果1个装)","price":148.00,"brand":"果宾斯","item":"https://item.jd.com/10069719921889.html?bbtf=1","group":6,"author":"葛念"}
{ "ip_addr": "143.198.202.89" ,"title":"佳农 马来西亚冷冻 猫山王榴莲果肉 D197 300g/盒 榴莲 生鲜水果","price":116.90,"brand":"猫山王","item":"https://item.jd.com/100018271799.html","group":6,"author":"葛念"}
{ "ip_addr": "159.223.94.183" ,"title":"绿鲜森榴莲泰国新鲜金枕头现摘带壳鲜果生鲜进口水果 8-10斤(2到4个)","price":239.90,"brand":"金枕头","item":"https://item.jd.com/10072663295057.html","group":6,"author":"葛念"}
{ "ip_addr": "170.64.158.198" ,"title":"四叔公果蔬黑刺榴莲【顺丰】带壳榴莲品种液氮冷冻新鲜树熟黑刺榴莲 【品质优果】5.7-6.2斤","price":1359.00,"brand":"黑刺榴莲","item":"https://item.jd.com/10047093382701.html","group":6,"author":"葛念"}
{ "ip_addr": "143.198.150.19" ,"title":"猫宝黑刺榴莲马来西亚稀有品种高山树上熟榴莲 新鲜液氮冷冻水果D200 5.5-6斤(保5房)","price":1258.00,"brand":"猫宝黑刺榴莲","item":"https://item.jd.com/10067368440468.html","group":6,"author":"葛念"}
{ "ip_addr": "165.232.185.147" ,"title":"怡仙觅 马来西亚猫山王榴莲D197液氮冷冻新鲜水果带壳进口顺丰空运 6.5-7斤(保五房肉 )1-2个果","price":1098.00,"brand":"猫山王榴莲","item":"https://item.jd.com/10057157222787.html","group":6,"author":"葛念"}
{ "ip_addr": "206.189.143.93" ,"title":"猫山王榴莲D197冷冻榴莲 稀有果6.0斤以上","price":1099.00,"brand":"猫山王榴莲","item":"https://item.jd.com/10040539185728.html","group":6,"author":"葛念"}
{ "ip_addr": "165.232.182.114" ,"title":"田良季马来西亚进口黑刺D200榴莲猫山王冻果肉榴莲水果送礼品物盒端午节 【百里挑一】帝王级6.2斤-6.7斤","price":1999.00,"brand":"猫山王榴莲","item":"https://item.jd.com/10031404635057.html","group":6,"author":"葛念"}
{ "ip_addr": "195.226.194.242" ,"title":"骄瑰精选品质 马来西亚猫山王榴莲新鲜冷冻保鲜整果带壳 整箱20斤(大满足 4、5个大果 绿壳)","price":2688.00,"brand":"猫山王榴莲","item":"https://item.jd.com/10040539185728.html","group":6,"author":"葛念"}
{ "ip_addr": "195.226.194.142" ,"title":"采吟新货马来西亚冷冻D197猫山王榴莲整箱20斤双A带壳整颗4-5个 一箱4-5个20斤 20斤","price":2688.00,"brand":"猫山王榴莲","item":"https://item.jd.com/10069444215179.html","group":6,"author":"葛念"}
1.txt 数据:
ip_addr,129.157.137.130,title,曲奇,price,954,brand,无敌难吃,item,https://item.jd.com/10066410938387.html,group,1,author,赵本国
ip_addr,74.58.235.173,title,乐事,price,25,brand,乐视,item,https://item.jd.com/963181.html,group,1,author,赵本国
ip_addr,166.208.17.58,title,百草味蔬菜零食大礼包,price,19,brand,百草味,item,https://item.jd.com/100008511039.html,group,1,author,赵本国
ip_addr,27.80.79.85,title,樵子朗,price,50,brand,乐视,item,https://item.jd.com/100020662205.html,group,1,author,赵本国
ip_addr,160.135.117.120,title,奶酪帮1岁宝宝吃的零食,price,50,brand,没品牌,item,https://item.jd.com/10037558904064.html,group,1,author,赵本国
ip_addr,93.246.210.234,title,小米卡齐吧,price,50,brand,卡其,item,https://item.jd.com/67756937399.html,group,1,author,赵本国
ip_addr,196.227.186.164,title,方便面,price,50,brand,八千里豆腐,item,https://item.jd.com/10032362327665.html,group,1,author,赵本国
ip_addr,151.85.6.63,title,超辣拉稀毛肚,price,50,brand,魔域毛肚,item,https://item.jd.com/10067842566286.html,group,1,author,赵本国
ip_addr,189.77.120.106,title,麻婆豆腐,price,50,brand,千里香,item,https://item.jd.com/10066594014087.html,group,1,author,赵本国
ip_addr,135.41.92.37,title,铁观音大豆腐干,price,50,brand,肉棒级别,item,https://item.jd.com/10069811335080.html,group,1,author,赵本国
1.bulk 数据
{ "create": { "_index": "baimei-shopping"} }
{ "ip_addr": "211.144.24.221" ,"title":"赞鹰防弹插板防弹钢板 NIJ IV级碳化硅陶瓷PE复合防弹胸插板2.45KG","price":950,"brand":"赞鹰","item":"https://item.jd.com/100054634181.html?bbtf=1","group":2,"author":"刘澔橦"}
{ "create": { "_index": "baimei-shopping"} }
{ "ip_addr": "211.144.24.221" ,"title":"凯夫拉防弹衣防弹背心战术防暴服 三级凯夫拉防弹衣","price":1700,"brand":"赞鹰","item":"https://item.jd.com/100045951975.html?bbtf=1#crumb-wrap","group":2,"author":"刘澔橦"}
{ "create": { "_index": "baimei-shopping"} }
{ "ip_addr": "211.144.24.221" ,"title":"南极人(Nanjiren)短袖t恤男夏季运动速干冰丝小网眼体恤衣服汗衫半袖上衣t恤男XL","price":49,"brand":"南极人","item":"https://item.jd.com/100043918428.html?bbtf=1","group":2,"author":"刘澔橦"}
{ "create": { "_index": "baimei-shopping"} }
{ "ip_addr": "211.144.24.221" ,"title":"吉普(JEEP)冰丝短袖T恤男运动套装夏季修身显瘦潮流整套搭配冰感T恤衣服男 灰色 F9803 XL 建议128-145斤","price":189,"brand":"jeep","item":" https://item.jd.com/10050847501196.html?bbtf=1","group":2,"author":"刘澔橦"}
{ "create": { "_index": "baimei-shopping"} }
{ "ip_addr": "211.144.24.221" ,"title":"HLA海澜之家修身三件套时尚挺括套西男礼服套装HTXAD3Q096A 藏青96 175/96B 推荐69-74kg","price":439,"brand":"海澜之家","item":"https://item.jd.com/70924395652.html?bbtf=1","group":2,"author":"刘澔橦"}
我们单独的写 logstash 配置文件, 来处理这些数据:
(1) json.conf
input {
file {
start_position => "beginning"
path => ["/tpm/apps/*.json"]
type => "json"
}
}
filter {
json {
source => "message"
}
mutate {
remove_field => ["@version"]
rename => {
"auther" => "author"
"ipaddr" => "ip_addr"
"in_addr" => "ip_addr"
}
}
geoip {
"source" => "ip_addr"
}
}
output {
elasticsearch {
hosts => ["10.0.0.111:19200","10.0.0.112:19200","10.0.0.113:19200"]
index => "baimei-shopping"
}
# stdout {
# codec => rubydebug
# }
}
如果是测试,我们打开
stdout { codec => rubydebug }
我们先用 不用 pipline 来处理一下:
logstash -rf /root/config/json.conf
启动过后,我们往文件中写入数据测试一下:
tail -1 1.json >> 1.json
这样我们的配置文件是没问题的。
(2) bulk.conf
input {
file {
start_position => "beginning"
path => ["/tmp/apps/*.bulk"]
type => "bulk"
}
}
filter {
if [message] =~ ".*create.*" {
drop { }
}
json {
source => "message"
}
mutate {
remove_field => ["@version"]
rename => {
"auther" => "author"
"ipaddr" => "ip_addr"
"in_addr" => "ip_addr"
}
}
geoip {
"source" => "ip_addr"
}
}
output {
elasticsearch {
hosts => ["10.0.0.111:19200","10.0.0.112:19200","10.0.0.113:19200"]
index => "baimei-shopping"
}
stdout {
codec => rubydebug
}
}
(3) txt.conf
input {
file {
start_position => "beginning"
path => ["/baimei/apps/*.txt"]
type => "txt"
}
}
filter {
mutate {
split => { "message" => "," }
}
mutate {
add_field => {
"%{[message][0]}" => "%{[message][1]}"
"%{[message][2]}" => "%{[message][3]}"
"%{[message][4]}" => "%{[message][5]}"
"%{[message][6]}" => "%{[message][7]}"
"%{[message][8]}" => "%{[message][9]}"
"%{[message][10]}" => "%{[message][11]}"
"%{[message][12]}" => "%{[message][13]}"
}
}
mutate {
remove_field => ["@version"]
rename => {
"auther" => "author"
"ipaddr" => "ip_addr"
"in_addr" => "ip_addr"
}
}
geoip {
"source" => "ip_addr"
}
}
output {
elasticsearch {
hosts => ["10.0.0.111:19200","10.0.0.112:19200","10.0.0.113:19200"]
index => "baimei-shopping"
}
stdout {
codec => rubydebug
}
}
开始配置我们的 pipline
(1)
vim /etc/logstash/pipelines.yml
- pipeline.id: main
path.config: "/etc/logstash/conf.d/*.conf"
- pipline.id : baimei-bulk
path.config: "/root/config/bulk.conf"
- pipline.id : baimei-json
path.config: "/root/config/json.conf"
- pipline.id : baimei-txt
path.config: "/root/config/text.conf"
(2) 运行:
直接 运行logstash 就可以
logstash
报错,说 文件找不到, 这里主要是 因为 logstash rpm 安装方式,和 二进制方式安装,不太一样。
我们来配置一下:
做一个软连接:
[root@baimeidashu-elk113 ~]#
ln -sv /etc/logstash/pipelines.yml /usr/share/logstash/config/
再次启动:
验证 ES 的数据,是否成功上传就可以了。