Logstash
===
---
系統組態檔:
Lostash會隨著安裝時配置以下檔案
- logstash.yml: 為系統主要組態檔
- pipeline.yml: 包含在單個lostash實例中運行多個管道的框架及說明
- jvm.options: 包含JVM組態設定
- log4j2.properties: log4j2 組態設定
- startup.options (Linux): 包含系統服務啟動時環境變數
---
Pipeline 設定檔:
- Logstash設定檔需放置於/etc/logstash/conf.d
---
# Logstash configuration
---
## Structure of a Config File
```json=
# This is a comment. You should use comments to describe
# parts of your configuration.
input {
...
}
filter {
...
}
output {
...
}
```
運行Logstash設定檔
```json=
bin/logstash -f logstash-simple.conf
```
---
## 資料型態
---
### Boolean
```json=
ssl_enable => true
```
---
### Number
```json=
port => 33
```
---
### String
```json=
name => "Hello world"
```
---
### Hash
hash 是一個key value pairs的集合,需注意使用空白而不是逗號分隔。
```json=
match => {
"field1" => "value1"
"field2" => "value2"
...
}
```
---
### Lists
```json=
path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ]
```
---
### Bytes
支援SI (k M G T P E Z Y) 和 Binary (Ki Mi Gi Ti Pi Ei Zi Yi)單位
- Binary單位為base-1024
- SI單位為base-1000
- 不指定單位則以bytes表示
- 該欄位不分大小寫,接受數值與單位之間的空白
```json=
my_bytes => "1113" # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
```
---
### Codec
Codec可用於資料輸入和輸出編解碼。
可用的[Codec Plugin](https://www.elastic.co/guide/en/logstash/6.8/codec-plugins.html)
```json=
codec => "json"
```
---
## Field References
- 存取欄位的基本語法為[fieldname]。
- 如果您指的是top-level欄位,則可以省略"[ ]"。
- 要引用nested欄位,請指定該欄位的完整路徑:[top-level field][nested field]。
---
下列範例
- top-level fields (agent, ip, request, response, ua)
- nested fields (status, bytes, os)
```json
{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
"ip": "192.168.24.44",
"request": "/index.html"
"response": {
"status": 200,
"bytes": 52353
},
"ua": {
"os": "Windows 7"
}
}
```
---
## 條件判斷
比較運算式
```json=
equality: ==, !=, <, >, <=, >=
regexp: =~, !~ (checks a pattern on the right against a string value on the left)
inclusion: in, not in
```
布林運算式
```json=
and, or, nand, xor
```
一元運算式
```json=
!
```
---
範例
```json=
output {
# Send production errors to pagerduty
if [loglevel] == "ERROR" and [deployment] == "production" {
pagerduty {
...
}
}
}
```
---
## Logstash Input plugin
https://www.elastic.co/guide/en/logstash/current/input-plugins.html
---
- File
- http
- Jdbc
- Log4j
- Stdin
- Tcp
- Udp
....
---
## 標準輸入(stdin)
從標準輸入讀取事件,每個事件都是一行,如果要多行事件要成為單一事件,則需要使用多線編解碼器(multiline)。
```
input {
stdin {
codec => "plain"
id => "my_plugin_id"
}
}
output {
stdout { codec => rubydebug }
}
```
---
## 讀取文件(File)
Logstash 會監聽文件變化。而且會記錄一個叫.sincedb的資料庫文件來跟踪被監聽的日誌文件的當前讀取位置。不用擔心 logstash會遺漏資料。
```
input {
file {
path => ["/var/log/*.log","/var/log/**/*.log"]
id => "my_plugin_id"
}
}
```
---
## 讀取網路數據 (TCP)
通過TCP Socket取事件,支持log4j2 logs
```
input {
tcp {
port => 8888
mode => "server"
ssl_enable => false
}
}
output {
stdout { codec => rubydebug }
}
```
---
# Output plugins
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
---
- Csv
- Email
- File
- http
- Tcp
- Stdout
...
---
## 標準輸出(Stdout)
一個簡單的輸出到運行中Logstash的標準輸出。 調試插件配置時,通過允許在通過輸入和過濾器後立即訪問事件數據,此輸出非常方便。
```
output {
stdout { codec => json }
}
```
---
## 保存成文件(File)
輸出將事件寫入磁碟上,可以將事件中的字段以文件名稱/或路徑的一部分。預設情況下,此輸出以json格式每行寫入一個事件。
```
output {
file {
path => ...
codec => line { format => "custom format: %{message}"}
}
}
```
---
## 調用命令執行(Exec)
exec輸出將為收到的每個事件運行命令。 將使用Ruby的system()函數,命令字符串將傳遞給shell。 在命令中可以使用%{name}和其他動態字符串將選擇字段從事件傳遞到子進程。
```
output {
if [type] == "abuse" {
exec {
command => "iptables -A INPUT -s %{clientip} -j DROP"
}
}
}
```
---
## 輸出到Elasticsearch
將輸出將事件輸出到Elasticsearch。Elasticsearch 使用 bulk API 進行大量資料傳輸。
```
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}"
}
}
```
---
# logstash filter
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
過濾器插件對事件執行中間處理。
---
- Grok
- Json
- Mutate
- Ruby
- Split
...
---
## Grok 正則捕獲
Grok 是 Logstash 最重要的插件。你可以在 grok 裡預定義好命名正則表達式在稍後在(grok參數或者其他正則表達式裡)引用它。
Examples
```
55.3.244.1 GET /index.html 15824 0.043
```
pattern
```
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
```
---
程式範例
```
input {
file {
path => "/var/log/http.log"
}
}
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
```
處理結果
```
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
```
---
## 數據修改(Mutate)
- filters/mutate 插件是 Logstash 另一個重要插件。它提供了豐富的基礎類型數據處理能力。包括類型轉換,字符串處理和字段處理等。
- 類型轉換:可以設置的轉換類型包括:"integer","float" 和 "string"
---
```
filter {
mutate {
convert => {
"fieldname" => "integer"
"booleanfield" => "boolean"
}
}
}
```
---
## 數據修改(Mutate)
字符串處理
---
- gsub :字串取代,僅對字串類型字段有效
```
filter {
mutate {
gsub => [
# replace all forward slashes with underscore
"fieldname", "/", "_",
# replace backslashes, question marks, hashes, and minuses
# with a dot "."
"fieldname2", "[\\?#-]", "."
]
}
}
```
---
- split: 對字串類型字段分割
```
filter {
mutate {
split => ["message", "|"]
}
}
```
---
- join:僅對Arrary類型字段有效
```
filter {
mutate {
join => { "fieldname" => "," }
}
}
```
---
- merge:合併兩個數組或者哈希字段
```
`array` + `string` will work
`string` + `string` will result in an 2 entry array in `dest_field`
`array` and `hash` will not work
filter {
mutate {
merge => { "dest_field" => "added_field" }
}
}
```
---
# 數據修改(Mutate)
字段處理
---
- rename: 重命名某個字段,如果目的字段已經存在,會被覆蓋掉
```
filter {
mutate {
split => { "hostname" => "." }
add_field => { "shortHostname" => "%{[hostname][0]}" }
}
mutate {
rename => ["shortHostname", "hostname" ]
}
}
```
---
- update: 更新某個字段的內容,如果字段不存在,不會新建。
```
filter {
mutate {
update => { "sample" => "My new message" }
}
}
```
---
- replace: 作用和 update 類似,但是當字段不存在的時候,它會起到 add_field 參數一樣的效果,自動添加新的字段。
```
filter {
mutate {
replace => { "message" => "%{source_host}: My new message" }
}
}
```
---
## JSON filter
預設情況下,JSON解析過濾器它將解析的JSON放在Logstash事件的root(頂層)中,但是可以使用此配置將此過濾器配置為將JSON放入任何任意事件字段中 `target`。
---
```
filter {
json {
source => "message"
target => "jsoncontent"
}
}
```
---
## Ruby 處理
- 資料處理功能若是無法滿足任務需求,可以透過 Ruby 處理,filters/ruby 插件將會是一個非常有用的工具。
---
```
filter{
date {
match => ["datetime" , "UNIX"]
}
ruby {
code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"
}
}
```
透過 ruby code 進行時間判斷,濾掉時間範圍與當前時間差距太大的數據。
---
# Data Resiliency
資料流事件處理管道時,Logstash可能會遇到異常情況,造成服務異常終止情況。 為防止資料丟失並確保事件不間斷地流經管道,Logstash提供以下資料恢復功能。
---
1. Persistent Queues:
- 將資料佇列儲存事件於內部硬碟,以防止資料丟失
2. Dead letter Queues:
- Logstash無法處理的事件提供磁盤存儲。透過dead_letter_queue輸入插件輕鬆地重新處理dead letter queue中的事件。
---
## Persistent queue
1. Logstash在管道階段 (input → pipeline worker) 之間使用內存中有佇列來緩衝事件。 這些記憶體中佇列的大小是固定的,不可配置。如果Logstash遇到臨時硬體故障,則記憶體中佇列的內容將丟失。為了防止異常終止期間的資料丟失,Logstash具有持久佇列功能,該功能將事件佇列存儲在硬碟上。
2. 過往透過訊息代理服務( Redis、RabbitMQ或 Kafka) 以促進訊息緩衝,現在可以使用持久佇列功能。
---
input→persistent queue→filter→output
設定在pipeline如下:
```
queue.type: persisted
queue.max_bytes: 4gb
```
---
## Dead letter queue
當Logstash遇到資料映射錯誤或其他問題而無法處理的事件時,Logstash管道會掛起或丟棄不成功的事件。為了防止在這種情況下丟失資料,配置Logstash將不成功的事件寫入dead letter queue而不是丟棄。
---
默認情況下是禁用,要使用需編輯 /etc/logstash/logstash.yml
```
dead_letter_queue.enable: true
path.dead_letter_queue: "path/to/data/dead_letter_queue" #設定資料存放位置
```
---
重新處理失敗的資料
可以再把失敗的資料接起來處理,使用dead_letter_queue套件即可
```
input {
dead_letter_queue {
path => "/path/to/data/dead_letter_queue" #dead_letter_queue設定的位置
commit_offsets => true #保存偏移量。管道重新啟動時,它將繼續從中斷的位置開始讀取,而不是重新處理隊列中的所有項目
pipeline_id => "main"
}
}
```
---
# 敬請指教
----
{"metaMigratedAt":"2023-06-16T00:31:32.961Z","metaMigratedFrom":"Content","title":"Logstash","breaks":true,"contributors":"[{\"id\":\"565617f1-fa63-410c-9fb9-a9278632bb7a\",\"add\":10345,\"del\":2141}]"}