知識分享-ELK介紹 (1)
=====
# Agenda
1. ELK 基本介紹
2. 使用 Logstash 做資料整理
- 以國際卡監控案為範例
3. 提高資料高可用性 (Data Resiliency)

# ELK 基本介紹
- Elasticsearch
- 基於Lucene搜索引擎的NoSQL數據庫
- Logstash
- 處理來自不同來源的數據,並儲存至你喜愛的儲存體
- Kibana
- 提供elasticsearch的儀錶板和進行數據可視化。

# 使用 Logstash 做資料整理
## 系統組態檔
**路徑 : /etc/logstash/**
1. logstash.yml: 為系統主要組態檔
2. pipeline.yml: 包含在單個lostash實例中運行多個管道的框架及說明
3. jvm.options: 包含JVM組態設定
4. log4j2.properties: log4j2 組態設定
5. startup.options (Linux): 包含系統服務啟動時環境變數
### Pipeline 優化設定
**如果沒有超過每秒數萬,可以先讓 Logstash 使用預設值跑跑看**
1. pipeline.workers
- 決定要用幾個 Thread 來處理 Filter 和 Output,預設為CPU核心數
- CPU 尚未充分使用,可以用CPU的倍數調高此數
2. pipeline.batch.size
- 每個 Worker Thread 最大的 Event 數
- 條高可增加效能,但會增加Java Heap Memory
3. pipeline.batch.delay
- 在單一個 Worker Thread 中等待新的 Event 被傳送進來的時間 (milliseconds )
> 參考 :
> [logstash-settings-file](https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html)
> [我的 ElasticSearch 調校之旅](https://medium.com/starbugs/%E6%88%91%E7%9A%84-elasticsearch-%E8%AA%BF%E6%A0%A1%E4%B9%8B%E6%97%85-89c380b5673c)
> [Performance Troubleshooting](https://www.elastic.co/guide/en/logstash/current/performance-troubleshooting.html)
## 資料型態
- Boolean
```json=
ssl_enable => true
```
- Number
```json=
port => 33
```
- String
```json=
name => "Hello world"
```
- Hash
hash 是一個key value pairs的集合,需注意使用空白而不是逗號分隔。
```json=
match => {
"field1" => "value1"
"field2" => "value2"
...
}
```
- Lists
```json=
path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ]
```
>參考 :
>[configuration-file-structure](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html)
## Pipeline 設定檔
**Logstash Pipeline 設定檔放置於 /etc/logstash/conf.d**
**設定檔結構:**
```json=
# This is a comment. You should use comments to describe
# parts of your configuration.
input {
# 輸入資料來源
}
filter {
# 資料處理
}
output {
# 輸出資料目的
}
```
>參考 :
>[input-plugins](https://www.elastic.co/guide/en/logstash/current/input-plugins.html)
>[filter-plugins](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html)
>[output-plugins](https://www.elastic.co/guide/en/logstash/current/output-plugins.html)
## Logstash 欄位存取
- 存取欄位的基本語法為[fieldname]。
- 如果是top-level欄位,則可以省略"[ ]"。
- 要引用nested欄位,請指定該欄位的完整路徑:[top-level field][nested field]。
#### 範例
- top-level fields (agent, ip, request, response, ua)
- nested fields (status, bytes, os)
**存取 request ,可使用 [request] 或簡寫 request 。**
**存取 status ,使用[response][status]。**
```json
{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
"ip": "192.168.24.44",
"request": "/index.html"
"response": {
"status": 200,
"bytes": 52353
},
"ua": {
"os": "Windows 7"
}
}
```
## Logstash Input plugin
**以國際卡監控為範例:**
**logstash.conf**
1. beats :
- 在5044 Port 接收來自 Elastic Beats 框架的事件。
2. heartbeat :
- Generates heartbeat events for testing。
- 固定時間產生 Event 避免 Output 長時間無連線,被防火牆中斷連線。
```json=
input
{
beats {
port => 5044
}
heartbeat {
message => "sequence"
interval => 30
type => "batch_log"
}
}
```
**grafanalog.conf**
file :
- path : 檔案路徑
- codec : 編碼器,使用 JSON 格式解析
- discover_interval : 讀取檔案的頻率,需再乘以 stat_interval (預設1秒)
```json=
input {
file {
path => "/ICTM/log/grafana/grafana.log"
codec => "json"
discover_interval => 15
}
}
```
**alert.conf**
http :
- 接收來在 Grafana Alert 事件
- host : Listen IP
- port : Listen Port
```json=
input {
http {
host => "127.0.0.1"
port => 31311
}
}
```
>參考 :
>[input-plugins](https://www.elastic.co/guide/en/logstash/current/input-plugins.html)
## Logstash Filter
### Grok Filter
可以在 grok 裡預定義好命名正則表達式在稍後在(grok參數或者其他正則表達式裡)引用它。
正規表達式:
```
(?<queue_id>[0-9A-F]{10,11})
(?<card_unformat>{.*})
```
grok 表達式
```
%{SYNTAX:SEMANTIC}
%{NUMBER:duration} %{IP:client}
```
**以國際卡監控為範例:**
```
[Timer-3] 00:00:52,496 INFO LogMoniData L:324 - {"Role":"950005","Link":"Stat_App","Level":"info","Status":"Normal","StartDate":"20201127","StartTime":"18:59","HeartBeat":"00:00"}
```
**從 card log(message) 分出 log 附加資訊(prefixdata) 跟 log Json 資料(card_unformat)**
**再從 log 附加資訊(prefixdata) 細分出個欄位**
```json=
# 1.retrive card log json data
grok {
match => ["message", "%{DATA:prefixdata} - (?<card_unformat>{.*})"]
}
grok {
match => ["prefixdata", "\[%{DATA:thread}\]\s+%{DATA:aplogtime}\s+%{DATA:loglevel}\s+%{DATA:logclass}\s+%{DATA:logline}\s+"]
}
```
>參考 :
>[filter配置 | ELK 教程](http://docs.flycloud.me/docs/ELKStack/logstash/plugins/filter/index.html)
>[filter-plugins](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html)
### JSON Filter
將 JSON 字串(card_unformat)轉換為物件,並將結果放入 target (message)欄位。
```json=
# 2. transform json data to es filed
json {
source => "card_unformat"
target => "message"
}
```
### 條件判斷
比較運算式 :
- equality: ==, !=, <, >, <=, >=
- regexp: =~, !~ (checks a pattern on the right against a string value on the left)
- inclusion: in, not in
布林運算式 :
- and, or, nand, xor
一元運算式 :
- !
### 數據修改(Mutate)
提供了豐富的基礎類型數據處理能力,包括類型轉換,字串處理和欄位處理等。
字串處理:
- gsub : 字串取代,僅對字串類型欄位有效
- split : 對字串類型欄位分割
- join : 僅對Arrary類型欄位有效
- merge : 合併兩個數組或者哈希欄位
欄位處理:
- add_field : 新增欄位
- copy : 將欄位複製至新欄位
- rename : 重命名某個欄位,如果目的欄位已經存在,會被覆蓋掉
- update : 更新某個欄位的內容,如果欄位不存在,不會新建。
- replace : 作用和 update 類似,但是當欄位不存在的時候,它會起到 add_field 參數一樣的效果,自動添加新的欄位。
- remove_field : 刪除欄位,可一次刪除多個欄位
類型轉換:
- 可以設置的轉換類型包括:“integer”,“float” 和 “string”
```json=
filter {
mutate {
convert => {
"fieldname" => "integer"
"booleanfield" => "boolean"
}
}
}
```
>參考 :
>[plugins-filters-mutate](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html)
#### 範例1 (判斷欄位存在)
**判斷 issBank 欄位是否存在,存在則將值帶入 issBank 不存在放入 "unknown"**
欄位處理:
- add_field : 新增欄位
- copy : 將欄位複製至新欄位
```json=
# add issBank field for transaction_message if not exists
# move message.issBank to issBank
if [message][mti] and [message][issBank] {
mutate { copy => { "[message][issBank]" => "issBank"} }
}
else if [message][mti] {
mutate { add_field => { "issBank" => "unknown"} }
}
```
#### 範例2 (比對字串在陣列中)
**由不同來源定義不同欄位名稱如 isTimeout 或是 timeout ,都將值映射至 isTimeout 欄位**
**使用 in 可比對是否包含於陣列中**
```json=
# move message.isTimeout or message.timeout boolean value to isTimeout
if [message][mti] and [message][isTimeout] in ["","true","false"] {
mutate { copy => { "[message][isTimeout]" => "isTimeout"} }
}
else if [message][mti] and [message][timeout] in ["","true","false"] {
mutate { copy => { "[message][timeout]" => "isTimeout"} }
}
```
#### 範例3 (移除多餘欄位)
**移除多餘欄位**
```json=
# 7. remove unsued fields
mutate {
remove_field => [ "card_unformat", "[agent][ephemeral_id]", "[agent][hostname]", "[agent][id]", "ecs", "host", "tags", "type", "prefixdata" ]
}
```
#### 範例4 (Ruby 處理)
資料處理功能若是無法滿足任務需求,可以透過 filters/ruby 插件處理。
**案例:測試套 Windows 主機校時出現晚1分鐘,造成 Grafana 告警誤判。**
**原 @timestamp 是 FileBeat 採樣的時間 (並非實際 log 的時間)**
**為避免監控系統會出現幾秒內的資料空白影響告警判斷,調整時間戳至最接近現在時間**
**1. 新增 logstash_time 欄位並儲存當下時間**
**2. 將 logstash_time 欄位值覆蓋至 @timestamp 欄位**
```json=
ruby {
code => "event.set('logstash_time', Time.now());"
}
mutate { rename => { "logstash_time" => "@timestamp" } }
```
#### 範例5(metrics)
**測量Event 筆數**
- events:
- count:397740 (累積筆數)
- rate_1m:6.886617278337198 (最近1分鐘每秒筆數)
- rate_5m:6.726008812826743 (最近5分鐘每秒筆數)
- rate_15m:6.668854310537374 (最近15分鐘每秒筆數)
```json=
# 9. eps,events per sec
metrics {
meter => "events"
add_tag => "metric"
}
...
else if "metric" in [tags] {
elasticsearch {
hosts => ["http://10.201.197.21:9200"]
# unreconized messages
index => "metric_log-%{+yyyy.MM.dd}"
}
}
```
>參考 :
>[plugins-filters-metrics](https://www.elastic.co/guide/en/logstash/current/plugins-filters-metrics.html)
## Output plugins
**輸出至 elasticsearch ,採1天產1個index方式**
```json=
elasticsearch {
hosts => ["http://10.201.197.21:9200"]
index => "transaction_message-%{+yyyy.MM.dd}"
}
```
>參考:
>[output-plugins](https://www.elastic.co/guide/en/logstash/current/output-plugins.html)
# 提高資料高可用性 (Data Resiliency)
資料流事件處理管道時,Logstash可能會遇到異常情況,造成服務異常終止情況。
為防止資料丟失並確保事件不間斷地流經管道,Logstash提供以下資料恢復功能。
1. Persistent Queues:
- 將資料佇列儲存事件於內部硬碟,以防止資料丟失
2. Dead letter Queues:
- Logstash無法處理的事件提供磁盤存儲。透過dead_letter_queue輸入插件輕鬆地重新處理dead letter queue中的事件。
## Persistent queue
1. Logstash在管道階段 (input → pipeline worker) 之間使用內存中有佇列來緩衝事件。 這些記憶體中佇列的大小是固定的,不可配置。如果Logstash遇到臨時硬體故障,則記憶體中佇列的內容將丟失。為了防止異常終止期間的資料丟失,Logstash具有持久佇列功能,該功能將事件佇列存儲在硬碟上。
2. 過往透過訊息代理服務( Redis、RabbitMQ或 Kafka) 以促進訊息緩衝,現在可以使用持久佇列功能。
3. 使用Persistent Queues 會影響Logstash整體效能,尤其是當queue.type選擇persisted把資料存放在硬碟上,同步磁盤I/O會大大影響效能需要多開多Logstash管道才有辦法改善
```flow
st=>start: input
e=>end: output
op=>operation: persistent queue
op2=>operation: filter
st(right)->op(right)->op2(right)->e
```
設定在pipeline如下:
```json=
queue.type: persisted
queue.max_bytes: 4gb
```
## Dead letter queue
當Logstash遇到資料映射錯誤或其他問題而無法處理的事件時,Logstash管道會掛起或丟棄不成功的事件。為了防止在這種情況下丟失資料,配置Logstash將不成功的事件寫入dead letter queue而不是丟棄。
默認情況下是禁用,要使用需編輯 /etc/logstash/logstash.yml
```json=
dead_letter_queue.enable: true
path.dead_letter_queue: "path/to/data/dead_letter_queue" #設定資料存放位置
```
重新處理失敗的資料
可以再把失敗的資料接起來處理,使用dead_letter_queue套件即可
```json=
input {
dead_letter_queue {
path => "/path/to/data/dead_letter_queue" #dead_letter_queue設定的位置
commit_offsets => true #保存偏移量。管道重新啟動時,它將繼續從中斷的位置開始讀取,而不是重新處理隊列中的所有項目
pipeline_id => "main"
}
}
```
>參考
>[Day10-Logstash-Queues](https://ithelp.ithome.com.tw/articles/10237110)
>[下一頁 :知識分享-ELK介紹 (2)](https://hackmd.io/@VlYX8fpjQQyfuaknhjK7eg/BkyANreid)