logstash-quickstart

logstash quickstart

首先确定安装java,然后按照官网指导安装 logstash。macos 可以直接使用 brew install logstash 来安装。

hello world

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$logstash -e 'input{stdin{}} output{stdout{}}'
Sending Logstash logs to /usr/local/Cellar/logstash/6.5.4/libexec/logs which is now configured via log4j2.properties
[2019-01-16T16:28:56,611][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-01-16T16:28:56,634][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.5.4"}
[2019-01-16T16:28:59,384][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2019-01-16T16:28:59,508][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x20e105d1 run>"}
The stdin plugin is now waiting for input:
[2019-01-16T16:28:59,577][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-01-16T16:28:59,905][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
hello world
{
"@version" => "1",
"message" => "hello world",
"host" => "macbook-Pro.local",
"@timestamp" => 2019-01-16T08:29:12.412Z
}
[2019-01-16T16:44:07,783][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x20e105d1 run>"}

-e 表示从命令行直接读取配置,CRTL+D将退出 logstash。

日志解析

在创建Logstash管道之前,需要配置Filebeat来把日志发送到Logstash。安装filebeat可以使用 brew install filebeat。安装Filebeat后,需要进行配置。打开filebeat.yml(mac brew 路径 /usr/local/etc/filebeat),并使用以下行替换内容。确保paths指向日志文件:

1
2
3
4
5
6
filebeat.prospectors:
- type: log
paths:
- /path/to/file/logstash-tutorial.log # 文件或目录的绝对路径
output.logstash:
hosts: ["localhost:5044"]

注意:在filebeat 7.0中 配置filebeat.prospectors 已经被改为 filebeat.inputs,启动时会有如下异常:Exiting: 1 error: setting ‘filebeat.prospectors’ has been removed

执行下面的命令来运行Filebeat:

1
filebeat -e -c filebeat.yml -d "publish"

Filebeat使用5044进行连接,在Logstash启动Beats插件之前,Filebeat不会收到任何响应

接下来创建一个Logstash管道使用Beats input插件接收来自Beats的事件。配置文件first-pipeline.conf的内容应该如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
input {
beats {
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
stdout { codec => rubydebug }
}

使用下面的命令来检查你的配置文件:

1
logstash -f first-pipeline.conf --config.test_and_exit

--config.test_and_exit 选项会分析你的配置文件并将其中的错误输出。如果配置文件通过了检查,使用下面的命令来启动Logstash:

1
2
logstash -f first-pipeline.conf --config.reload.automatic
# --config.reload.automatic选项可以让Logstash在你修改配置文件之后重载而不必重新启动。

此时可以看到输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"@version" => "1",
"prospector" => {
"type" => "log"
},
"beat" => {
"name" => "hostname",
"hostname" => "hostname-Pro.local",
"version" => "6.2.4"
},
"source" => "/path/logstash-tutorial-dataset.log",
"@timestamp" => 2019-01-21T05:10:39.667Z,
"message" => "218.30.103.62 - - [04/Jan/2015:05:28:21 +0000] \"GET /blog/productivity/better-zsh-xterm-title-fix.html HTTP/1.1\" 200 10185 \"-\" \"Sogou web spider/4.0(+http://www.sogou.com/docs/help/webmasters.htm#07)\"",
"offset" => 22128,
"host" => "hostname",
"tags" => [
[0] "beats_input_codec_plain_applied"
]
}

输出完毕后,进入 path.data目录(brew 安装的是/usr/local/var/lib/filebeat),查看registry 文件:

1
[{"source":"/path/logstash-tutorial-dataset.log","offset":24464,"timestamp":"2019-01-21T13:15:44.789115+08:00","ttl":-1,"type":"log","FileStateOS":{"inode":8610500539,"device":16777220}}]

filebeat 会记录收集的每个文件的状态,因此删除registry文件会强制Filebeat它从头开始读取收集的所有文件。

使用Grok Filter插件解析Web Logs

现在你有一个工作管道从Filebeat读取日志行。但是你会注意到日志消息的格式并不理想。您希望解析日志消息以便从日志中创建特定的命名字段。要达到此目的,可以使用grok过滤器插件。

grok 插件是 logstash 默认可以使用的几个插件。Grok插件可以将非结构化的数据转换为高质量的结构化数据。

典型的Web服务日志如下:

1
2
3
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

很容易辨认出最开始的是IP地址,方括号中的是时间标签。对于Apache日志你可以使用%{COMBINEDAPACHELOG}来将数据解析成如下结构。

Information Field Name
IP Address clientip
User ID ident
User Authentication auth
timestamp timestamp
HTTP Verb verb
Request body request
HTTP Version httpversion
HTTP Status Code response
Bytes served bytes
Referrer URL referrer
User agent agent

编辑first-pipeline.conf文件并使用下面的内容替换其中的filter字段:

1
2
3
4
5
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}

接下来删除上面提到的registry 文件,重新启动 logstash。可以看到如下解析后的 log:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
"httpversion" => "1.1",
"tags" => [
[0] "beats_input_codec_plain_applied"
],
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36\"",
"clientip" => "71.212.224.97",
"verb" => "GET",
"auth" => "-",
"source" => "/pathlogstash-tutorial-dataset.log",
"request" => "/images/web/2009/banner.png",
"timestamp" => "04/Jan/2015:05:27:35 +0000",
"message" => "71.212.224.97 - - [04/Jan/2015:05:27:35 +0000] \"GET /images/web/2009/banner.png HTTP/1.1\" 200 52315 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36\"",
"response" => "200",
"bytes" => "52315",
"referrer" => "\"http://www.semicomplete.com/projects/xdotool/\"",
"host" => "hostname",
"@timestamp" => 2019-01-21T06:30:58.330Z,
"ident" => "-",
"@version" => "1",
"beat" => {
"hostname" => "hostname",
"version" => "6.2.4",
"name" => "hostname"
},
"offset" => 21199,
"prospector" => {
"type" => "log"
}
}

需要注意的是原本的消息内容会被保留,但消息同时也会被分割成不同的字段。

使用GeoIP插件丰富你的数据

除了解析日志以便于搜索,filter插件还可以从现有的数据中进行扩展。举个例子,geoip插件可以从IP地址获取物理位置信息并将其添加到日志中。

在first-pipeline.conf文件的filter字段中添加以下内容来使用geoipfilter插件:

1
2
3
geoip {
source => "clientip"
}

geoip插件的配置需要你指定包含要查找的IP地址的源字段的名称。在示例中,clientip字段包含IP地址。

因为filter是按照顺序进行解析,所以配置文件中的geoip字段要在grok字段之后且这两个字段都要在filter字段中

当你做完之后,first-pipeline.conf文件中的内容应该如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}

重启Filebeat,解析后的事件中现在已经包含了物理位置信息。

将数据索引到Elasticsearch

Logstash可以将数据索引到Elasticsearch集群。编辑first-pipeline.conf文件中的output字段,内容如下:

1
2
3
4
5
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}

这个配置中,Logstash使用HTTP协议连接Elasticsearch。上面的示例中Logstash和Elasticsearch运行在相同 的实例中。你也可以指定一个远程Elasticsearch实例通过配置hosts如:hosts => ["es-machine:9200"]

重新执行 filebeat和Logstash,将数据存储到Elasticsearch集群中,之后可以在Elasticsearch集群中进行查询。

尝试在Elasticsearch中查询grokfilter插件创建的字段。用YYYY.MM.DD格式的当前时间替换下面的$DATE:

1
curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=response=200'

数据使用的索引名称基于UTC时间,并非Logstash运行的当地时间。如果查询返回index_not_found_exception,确保logstash-$DATE对应的名字是正确的索引名。可以使用这个查询指令来查看所有可用的索引:curl 'localhost:9200/_cat/indices?v'

logstash 支持输出到多个 es 节点。当hosts中配置多个IP地址的时候,Logstash会在地址列表中进行负载均衡,如果Elasticsearch使用的是默认的9200端口,你可以在上面的配置中省略它。

把Logstash数据写入文件

使用fileoutput插件,可以配置你的Logstash管道直接将数据写入文件。

通过向pipeline.conf文件的output配置段添加如下信息来使用fileoutput插件:

1
2
3
4
5
6
7
8
9
# 在输出到 es 时同时输出到文件
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
file {
path => "/path/to/target/file"
}
}

工作原理

Logstash事件处理管道有三个阶段:输入→过滤器→输出。输入生成事件,过滤器修改它们,输出将它们发送到其他地方。输入和输出支持编解码器,使您能够在数据进入或退出管道时对数据进行编码或解码,而无需使用单独的过滤器。

logstash在input阶段,将数据导入。下面是一些常用的input插件:

  • file: 从文件系统上读取文件
  • syslog: 在端口上监听syslog消息,并根据RFC3164进行解析
  • redis:使用redis通道和redis列表从redis服务器读取。
  • beats:处理beats发送的事件。

过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。一些有用的过滤包括:

  • grok:解析并构造任意文本。Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式。
  • mutate:对事件字段执行常规转换。可以重命名,删除,替换和修改事件中的字段。
  • drop:完全删除事件,例如调试事件。
  • clone:制作事件的副本,可能添加或删除字段。
  • geoip:添加有关IP地址的地理位置的信息

输出是Logstash管道的最后阶段。事件可以通过多个输出,但是一旦所有输出处理完成,事件就完成了它的执行。一些常用的输出包括:

  • elasticsearch:将事件数据发送到Elasticsearch。
  • file:将事件数据写入磁盘上的文件。
  • graphite:将事件数据发送到graphite。
  • statsd:将事件数据发送到statsd。

编解码器基本上是流过滤器,可以作为输入或输出的一部分。使用编解码器可以轻松地将消息传输与序列化过程分开。流行的编解码器包括json,, msgpack和plain (文本)。

  • json:以JSON格式编码或解码数据。
  • multiline:将多行文本事件(如java异常和堆栈跟踪消息)合并到一个事件中。

Logstash事件处理管道协调输入,过滤器和输出的执行。Logstash管道中的每个输入阶段都在其自己的线程中运行。将事件写入内存(默认)或磁盘上的中央队列。每个管道worker线程从该队列中获取一批事件,事件接着会通过配置的过滤器,然后过滤的事件通过输出。批处理的大小和管道工作线程的数量是可配置的。

默认情况下,Logstash在管道阶段(输入→过滤器和过滤器→输出)之间使用内存中有界队列来缓冲事件。如果Logstash不安全地终止,则存储在内存中的任何事件都将丢失。为了帮助防止数据丢失,您可以启用Logstash将正在进行的事件保存到磁盘。

-------------本文结束感谢您的阅读-------------
坚持分享,您的支持将鼓励我继续创作!
0%