知識分享-ELK介紹 (1) ===== # Agenda 1. ELK 基本介紹 2. 使用 Logstash 做資料整理 - 以國際卡監控案為範例 3. 提高資料高可用性 (Data Resiliency) ![](https://i.imgur.com/szWxAoA.png) # ELK 基本介紹 - Elasticsearch - 基於Lucene搜索引擎的NoSQL數據庫 - Logstash - 處理來自不同來源的數據,並儲存至你喜愛的儲存體 - Kibana - 提供elasticsearch的儀錶板和進行數據可視化。 ![](https://i.imgur.com/t2pvi6U.png) # 使用 Logstash 做資料整理 ## 系統組態檔 **路徑 : /etc/logstash/** 1. logstash.yml: 為系統主要組態檔 2. pipeline.yml: 包含在單個lostash實例中運行多個管道的框架及說明 3. jvm.options: 包含JVM組態設定 4. log4j2.properties: log4j2 組態設定 5. startup.options (Linux): 包含系統服務啟動時環境變數 ### Pipeline 優化設定 **如果沒有超過每秒數萬,可以先讓 Logstash 使用預設值跑跑看** 1. pipeline.workers - 決定要用幾個 Thread 來處理 Filter 和 Output,預設為CPU核心數 - CPU 尚未充分使用,可以用CPU的倍數調高此數 2. pipeline.batch.size - 每個 Worker Thread 最大的 Event 數 - 條高可增加效能,但會增加Java Heap Memory 3. pipeline.batch.delay - 在單一個 Worker Thread 中等待新的 Event 被傳送進來的時間 (milliseconds ) > 參考 : > [logstash-settings-file](https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html) > [我的 ElasticSearch 調校之旅](https://medium.com/starbugs/%E6%88%91%E7%9A%84-elasticsearch-%E8%AA%BF%E6%A0%A1%E4%B9%8B%E6%97%85-89c380b5673c) > [Performance Troubleshooting](https://www.elastic.co/guide/en/logstash/current/performance-troubleshooting.html) ## 資料型態 - Boolean ```json= ssl_enable => true ``` - Number ```json= port => 33 ``` - String ```json= name => "Hello world" ``` - Hash hash 是一個key value pairs的集合,需注意使用空白而不是逗號分隔。 ```json= match => { "field1" => "value1" "field2" => "value2" ... } ``` - Lists ```json= path => [ "/var/log/messages", "/var/log/*.log" ] uris => [ "http://elastic.co", "http://example.net" ] ``` >參考 : >[configuration-file-structure](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html) ## Pipeline 設定檔 **Logstash Pipeline 設定檔放置於 /etc/logstash/conf.d** **設定檔結構:** ```json= # This is a comment. You should use comments to describe # parts of your configuration. input { # 輸入資料來源 } filter { # 資料處理 } output { # 輸出資料目的 } ``` >參考 : >[input-plugins](https://www.elastic.co/guide/en/logstash/current/input-plugins.html) >[filter-plugins](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html) >[output-plugins](https://www.elastic.co/guide/en/logstash/current/output-plugins.html) ## Logstash 欄位存取 - 存取欄位的基本語法為[fieldname]。 - 如果是top-level欄位,則可以省略"[ ]"。 - 要引用nested欄位,請指定該欄位的完整路徑:[top-level field][nested field]。 #### 範例 - top-level fields (agent, ip, request, response, ua) - nested fields (status, bytes, os) **存取 request ,可使用 [request] 或簡寫 request 。** **存取 status ,使用[response][status]。** ```json { "agent": "Mozilla/5.0 (compatible; MSIE 9.0)", "ip": "192.168.24.44", "request": "/index.html" "response": { "status": 200, "bytes": 52353 }, "ua": { "os": "Windows 7" } } ``` ## Logstash Input plugin **以國際卡監控為範例:** **logstash.conf** 1. beats : - 在5044 Port 接收來自 Elastic Beats 框架的事件。 2. heartbeat : - Generates heartbeat events for testing。 - 固定時間產生 Event 避免 Output 長時間無連線,被防火牆中斷連線。 ```json= input { beats { port => 5044 } heartbeat { message => "sequence" interval => 30 type => "batch_log" } } ``` **grafanalog.conf** file : - path : 檔案路徑 - codec : 編碼器,使用 JSON 格式解析 - discover_interval : 讀取檔案的頻率,需再乘以 stat_interval (預設1秒) ```json= input { file { path => "/ICTM/log/grafana/grafana.log" codec => "json" discover_interval => 15 } } ``` **alert.conf** http : - 接收來在 Grafana Alert 事件 - host : Listen IP - port : Listen Port ```json= input { http { host => "127.0.0.1" port => 31311 } } ``` >參考 : >[input-plugins](https://www.elastic.co/guide/en/logstash/current/input-plugins.html) ## Logstash Filter ### Grok Filter 可以在 grok 裡預定義好命名正則表達式在稍後在(grok參數或者其他正則表達式裡)引用它。 正規表達式: ``` (?<queue_id>[0-9A-F]{10,11}) (?<card_unformat>{.*}) ``` grok 表達式 ``` %{SYNTAX:SEMANTIC} %{NUMBER:duration} %{IP:client} ``` **以國際卡監控為範例:** ``` [Timer-3] 00:00:52,496 INFO LogMoniData L:324 - {"Role":"950005","Link":"Stat_App","Level":"info","Status":"Normal","StartDate":"20201127","StartTime":"18:59","HeartBeat":"00:00"} ``` **從 card log(message) 分出 log 附加資訊(prefixdata) 跟 log Json 資料(card_unformat)** **再從 log 附加資訊(prefixdata) 細分出個欄位** ```json= # 1.retrive card log json data grok { match => ["message", "%{DATA:prefixdata} - (?<card_unformat>{.*})"] } grok { match => ["prefixdata", "\[%{DATA:thread}\]\s+%{DATA:aplogtime}\s+%{DATA:loglevel}\s+%{DATA:logclass}\s+%{DATA:logline}\s+"] } ``` >參考 : >[filter配置 | ELK 教程](http://docs.flycloud.me/docs/ELKStack/logstash/plugins/filter/index.html) >[filter-plugins](https://www.elastic.co/guide/en/logstash/current/filter-plugins.html) ### JSON Filter 將 JSON 字串(card_unformat)轉換為物件,並將結果放入 target (message)欄位。 ```json= # 2. transform json data to es filed json { source => "card_unformat" target => "message" } ``` ### 條件判斷 比較運算式 : - equality: ==, !=, <, >, <=, >= - regexp: =~, !~ (checks a pattern on the right against a string value on the left) - inclusion: in, not in 布林運算式 : - and, or, nand, xor 一元運算式 : - ! ### 數據修改(Mutate) 提供了豐富的基礎類型數據處理能力,包括類型轉換,字串處理和欄位處理等。 字串處理: - gsub : 字串取代,僅對字串類型欄位有效 - split : 對字串類型欄位分割 - join : 僅對Arrary類型欄位有效 - merge : 合併兩個數組或者哈希欄位 欄位處理: - add_field : 新增欄位 - copy : 將欄位複製至新欄位 - rename : 重命名某個欄位,如果目的欄位已經存在,會被覆蓋掉 - update : 更新某個欄位的內容,如果欄位不存在,不會新建。 - replace : 作用和 update 類似,但是當欄位不存在的時候,它會起到 add_field 參數一樣的效果,自動添加新的欄位。 - remove_field : 刪除欄位,可一次刪除多個欄位 類型轉換: - 可以設置的轉換類型包括:“integer”,“float” 和 “string” ```json= filter { mutate { convert => { "fieldname" => "integer" "booleanfield" => "boolean" } } } ``` >參考 : >[plugins-filters-mutate](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html) #### 範例1 (判斷欄位存在) **判斷 issBank 欄位是否存在,存在則將值帶入 issBank 不存在放入 "unknown"** 欄位處理: - add_field : 新增欄位 - copy : 將欄位複製至新欄位 ```json= # add issBank field for transaction_message if not exists # move message.issBank to issBank if [message][mti] and [message][issBank] { mutate { copy => { "[message][issBank]" => "issBank"} } } else if [message][mti] { mutate { add_field => { "issBank" => "unknown"} } } ``` #### 範例2 (比對字串在陣列中) **由不同來源定義不同欄位名稱如 isTimeout 或是 timeout ,都將值映射至 isTimeout 欄位** **使用 in 可比對是否包含於陣列中** ```json= # move message.isTimeout or message.timeout boolean value to isTimeout if [message][mti] and [message][isTimeout] in ["","true","false"] { mutate { copy => { "[message][isTimeout]" => "isTimeout"} } } else if [message][mti] and [message][timeout] in ["","true","false"] { mutate { copy => { "[message][timeout]" => "isTimeout"} } } ``` #### 範例3 (移除多餘欄位) **移除多餘欄位** ```json= # 7. remove unsued fields mutate { remove_field => [ "card_unformat", "[agent][ephemeral_id]", "[agent][hostname]", "[agent][id]", "ecs", "host", "tags", "type", "prefixdata" ] } ``` #### 範例4 (Ruby 處理) 資料處理功能若是無法滿足任務需求,可以透過 filters/ruby 插件處理。 **案例:測試套 Windows 主機校時出現晚1分鐘,造成 Grafana 告警誤判。** **原 @timestamp 是 FileBeat 採樣的時間 (並非實際 log 的時間)** **為避免監控系統會出現幾秒內的資料空白影響告警判斷,調整時間戳至最接近現在時間** **1. 新增 logstash_time 欄位並儲存當下時間** **2. 將 logstash_time 欄位值覆蓋至 @timestamp 欄位** ```json= ruby { code => "event.set('logstash_time', Time.now());" } mutate { rename => { "logstash_time" => "@timestamp" } } ``` #### 範例5(metrics) **測量Event 筆數** - events: - count:397740 (累積筆數) - rate_1m:6.886617278337198 (最近1分鐘每秒筆數) - rate_5m:6.726008812826743 (最近5分鐘每秒筆數) - rate_15m:6.668854310537374 (最近15分鐘每秒筆數) ```json= # 9. eps,events per sec metrics { meter => "events" add_tag => "metric" } ... else if "metric" in [tags] { elasticsearch { hosts => ["http://10.201.197.21:9200"] # unreconized messages index => "metric_log-%{+yyyy.MM.dd}" } } ``` >參考 : >[plugins-filters-metrics](https://www.elastic.co/guide/en/logstash/current/plugins-filters-metrics.html) ## Output plugins **輸出至 elasticsearch ,採1天產1個index方式** ```json= elasticsearch { hosts => ["http://10.201.197.21:9200"] index => "transaction_message-%{+yyyy.MM.dd}" } ``` >參考: >[output-plugins](https://www.elastic.co/guide/en/logstash/current/output-plugins.html) # 提高資料高可用性 (Data Resiliency) 資料流事件處理管道時,Logstash可能會遇到異常情況,造成服務異常終止情況。 為防止資料丟失並確保事件不間斷地流經管道,Logstash提供以下資料恢復功能。 1. Persistent Queues: - 將資料佇列儲存事件於內部硬碟,以防止資料丟失 2. Dead letter Queues: - Logstash無法處理的事件提供磁盤存儲。透過dead_letter_queue輸入插件輕鬆地重新處理dead letter queue中的事件。 ## Persistent queue 1. Logstash在管道階段 (input → pipeline worker) 之間使用內存中有佇列來緩衝事件。 這些記憶體中佇列的大小是固定的,不可配置。如果Logstash遇到臨時硬體故障,則記憶體中佇列的內容將丟失。為了防止異常終止期間的資料丟失,Logstash具有持久佇列功能,該功能將事件佇列存儲在硬碟上。 2. 過往透過訊息代理服務( Redis、RabbitMQ或 Kafka) 以促進訊息緩衝,現在可以使用持久佇列功能。 3. 使用Persistent Queues 會影響Logstash整體效能,尤其是當queue.type選擇persisted把資料存放在硬碟上,同步磁盤I/O會大大影響效能需要多開多Logstash管道才有辦法改善 ```flow st=>start: input e=>end: output op=>operation: persistent queue op2=>operation: filter st(right)->op(right)->op2(right)->e ``` 設定在pipeline如下: ```json= queue.type: persisted queue.max_bytes: 4gb ``` ## Dead letter queue 當Logstash遇到資料映射錯誤或其他問題而無法處理的事件時,Logstash管道會掛起或丟棄不成功的事件。為了防止在這種情況下丟失資料,配置Logstash將不成功的事件寫入dead letter queue而不是丟棄。 默認情況下是禁用,要使用需編輯 /etc/logstash/logstash.yml ```json= dead_letter_queue.enable: true path.dead_letter_queue: "path/to/data/dead_letter_queue" #設定資料存放位置 ``` 重新處理失敗的資料 可以再把失敗的資料接起來處理,使用dead_letter_queue套件即可 ```json= input { dead_letter_queue { path => "/path/to/data/dead_letter_queue" #dead_letter_queue設定的位置 commit_offsets => true #保存偏移量。管道重新啟動時,它將繼續從中斷的位置開始讀取,而不是重新處理隊列中的所有項目 pipeline_id => "main" } } ``` >參考 >[Day10-Logstash-Queues](https://ithelp.ithome.com.tw/articles/10237110) >[下一頁 :知識分享-ELK介紹 (2)](https://hackmd.io/@VlYX8fpjQQyfuaknhjK7eg/BkyANreid)