Logstash === --- 系統組態檔: Lostash會隨著安裝時配置以下檔案 - logstash.yml: 為系統主要組態檔 - pipeline.yml: 包含在單個lostash實例中運行多個管道的框架及說明 - jvm.options: 包含JVM組態設定 - log4j2.properties: log4j2 組態設定 - startup.options (Linux): 包含系統服務啟動時環境變數 --- Pipeline 設定檔: - Logstash設定檔需放置於/etc/logstash/conf.d --- # Logstash configuration --- ## Structure of a Config File ```json= # This is a comment. You should use comments to describe # parts of your configuration. input { ... } filter { ... } output { ... } ``` 運行Logstash設定檔 ```json= bin/logstash -f logstash-simple.conf ``` --- ## 資料型態 --- ### Boolean ```json= ssl_enable => true ``` --- ### Number ```json= port => 33 ``` --- ### String ```json= name => "Hello world" ``` --- ### Hash hash 是一個key value pairs的集合,需注意使用空白而不是逗號分隔。 ```json= match => { "field1" => "value1" "field2" => "value2" ... } ``` --- ### Lists ```json= path => [ "/var/log/messages", "/var/log/*.log" ] uris => [ "http://elastic.co", "http://example.net" ] ``` --- ### Bytes 支援SI (k M G T P E Z Y) 和 Binary (Ki Mi Gi Ti Pi Ei Zi Yi)單位 - Binary單位為base-1024 - SI單位為base-1000 - 不指定單位則以bytes表示 - 該欄位不分大小寫,接受數值與單位之間的空白 ```json= my_bytes => "1113" # 1113 bytes my_bytes => "10MiB" # 10485760 bytes my_bytes => "100kib" # 102400 bytes my_bytes => "180 mb" # 180000000 bytes ``` --- ### Codec Codec可用於資料輸入和輸出編解碼。 可用的[Codec Plugin](https://www.elastic.co/guide/en/logstash/6.8/codec-plugins.html) ```json= codec => "json" ``` --- ## Field References - 存取欄位的基本語法為[fieldname]。 - 如果您指的是top-level欄位,則可以省略"[ ]"。 - 要引用nested欄位,請指定該欄位的完整路徑:[top-level field][nested field]。 --- 下列範例 - top-level fields (agent, ip, request, response, ua) - nested fields (status, bytes, os) ```json { "agent": "Mozilla/5.0 (compatible; MSIE 9.0)", "ip": "192.168.24.44", "request": "/index.html" "response": { "status": 200, "bytes": 52353 }, "ua": { "os": "Windows 7" } } ``` --- ## 條件判斷 比較運算式 ```json= equality: ==, !=, <, >, <=, >= regexp: =~, !~ (checks a pattern on the right against a string value on the left) inclusion: in, not in ``` 布林運算式 ```json= and, or, nand, xor ``` 一元運算式 ```json= ! ``` --- 範例 ```json= output { # Send production errors to pagerduty if [loglevel] == "ERROR" and [deployment] == "production" { pagerduty { ... } } } ``` --- ## Logstash Input plugin https://www.elastic.co/guide/en/logstash/current/input-plugins.html --- - File - http - Jdbc - Log4j - Stdin - Tcp - Udp .... --- ## 標準輸入(stdin) 從標準輸入讀取事件,每個事件都是一行,如果要多行事件要成為單一事件,則需要使用多線編解碼器(multiline)。 ``` input { stdin { codec => "plain" id => "my_plugin_id" } } output { stdout { codec => rubydebug } } ``` --- ## 讀取文件(File) Logstash 會監聽文件變化。而且會記錄一個叫.sincedb的資料庫文件來跟踪被監聽的日誌文件的當前讀取位置。不用擔心 logstash會遺漏資料。 ``` input { file { path => ["/var/log/*.log","/var/log/**/*.log"] id => "my_plugin_id" } } ``` --- ## 讀取網路數據 (TCP) 通過TCP Socket取事件,支持log4j2 logs ``` input { tcp { port => 8888 mode => "server" ssl_enable => false } } output { stdout { codec => rubydebug } } ``` --- # Output plugins https://www.elastic.co/guide/en/logstash/current/output-plugins.html --- - Csv - Email - File - http - Tcp - Stdout ... --- ## 標準輸出(Stdout) 一個簡單的輸出到運行中Logstash的標準輸出。 調試插件配置時,通過允許在通過輸入和過濾器後立即訪問事件數據,此輸出非常方便。 ``` output { stdout { codec => json } } ``` --- ## 保存成文件(File) 輸出將事件寫入磁碟上,可以將事件中的字段以文件名稱/或路徑的一部分。預設情況下,此輸出以json格式每行寫入一個事件。 ``` output { file { path => ... codec => line { format => "custom format: %{message}"} } } ``` --- ## 調用命令執行(Exec) exec輸出將為收到的每個事件運行命令。 將使用Ruby的system()函數,命令字符串將傳遞給shell。 在命令中可以使用%{name}和其他動態字符串將選擇字段從事件傳遞到子進程。 ``` output { if [type] == "abuse" { exec { command => "iptables -A INPUT -s %{clientip} -j DROP" } } } ``` --- ## 輸出到Elasticsearch 將輸出將事件輸出到Elasticsearch。Elasticsearch 使用 bulk API 進行大量資料傳輸。 ``` output { elasticsearch { hosts => ["http://127.0.0.1:9200"] index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}" } } ``` --- # logstash filter https://www.elastic.co/guide/en/logstash/current/filter-plugins.html 過濾器插件對事件執行中間處理。 --- - Grok - Json - Mutate - Ruby - Split ... --- ## Grok 正則捕獲 Grok 是 Logstash 最重要的插件。你可以在 grok 裡預定義好命名正則表達式在稍後在(grok參數或者其他正則表達式裡)引用它。 Examples ``` 55.3.244.1 GET /index.html 15824 0.043 ``` pattern ``` %{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} ``` --- 程式範例 ``` input { file { path => "/var/log/http.log" } } filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } } ``` 處理結果 ``` client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043 ``` --- ## 數據修改(Mutate) - filters/mutate 插件是 Logstash 另一個重要插件。它提供了豐富的基礎類型數據處理能力。包括類型轉換,字符串處理和字段處理等。 - 類型轉換:可以設置的轉換類型包括:"integer","float" 和 "string" --- ``` filter { mutate { convert => { "fieldname" => "integer" "booleanfield" => "boolean" } } } ``` --- ## 數據修改(Mutate) 字符串處理 --- - gsub :字串取代,僅對字串類型字段有效 ``` filter { mutate { gsub => [ # replace all forward slashes with underscore "fieldname", "/", "_", # replace backslashes, question marks, hashes, and minuses # with a dot "." "fieldname2", "[\\?#-]", "." ] } } ``` --- - split: 對字串類型字段分割 ``` filter { mutate { split => ["message", "|"] } } ``` --- - join:僅對Arrary類型字段有效 ``` filter { mutate { join => { "fieldname" => "," } } } ``` --- - merge:合併兩個數組或者哈希字段 ``` `array` + `string` will work `string` + `string` will result in an 2 entry array in `dest_field` `array` and `hash` will not work filter { mutate { merge => { "dest_field" => "added_field" } } } ``` --- # 數據修改(Mutate) 字段處理 --- - rename: 重命名某個字段,如果目的字段已經存在,會被覆蓋掉 ``` filter { mutate { split => { "hostname" => "." } add_field => { "shortHostname" => "%{[hostname][0]}" } } mutate { rename => ["shortHostname", "hostname" ] } } ``` --- - update: 更新某個字段的內容,如果字段不存在,不會新建。 ``` filter { mutate { update => { "sample" => "My new message" } } } ``` --- - replace: 作用和 update 類似,但是當字段不存在的時候,它會起到 add_field 參數一樣的效果,自動添加新的字段。 ``` filter { mutate { replace => { "message" => "%{source_host}: My new message" } } } ``` --- ## JSON filter 預設情況下,JSON解析過濾器它將解析的JSON放在Logstash事件的root(頂層)中,但是可以使用此配置將此過濾器配置為將JSON放入任何任意事件字段中 `target`。 --- ``` filter { json { source => "message" target => "jsoncontent" } } ``` --- ## Ruby 處理 - 資料處理功能若是無法滿足任務需求,可以透過 Ruby 處理,filters/ruby 插件將會是一個非常有用的工具。 --- ``` filter{ date { match => ["datetime" , "UNIX"] } ruby { code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs" } } ``` 透過 ruby code 進行時間判斷,濾掉時間範圍與當前時間差距太大的數據。 --- # Data Resiliency 資料流事件處理管道時,Logstash可能會遇到異常情況,造成服務異常終止情況。 為防止資料丟失並確保事件不間斷地流經管道,Logstash提供以下資料恢復功能。 --- 1. Persistent Queues: - 將資料佇列儲存事件於內部硬碟,以防止資料丟失 2. Dead letter Queues: - Logstash無法處理的事件提供磁盤存儲。透過dead_letter_queue輸入插件輕鬆地重新處理dead letter queue中的事件。 --- ## Persistent queue 1. Logstash在管道階段 (input → pipeline worker) 之間使用內存中有佇列來緩衝事件。 這些記憶體中佇列的大小是固定的,不可配置。如果Logstash遇到臨時硬體故障,則記憶體中佇列的內容將丟失。為了防止異常終止期間的資料丟失,Logstash具有持久佇列功能,該功能將事件佇列存儲在硬碟上。 2. 過往透過訊息代理服務( Redis、RabbitMQ或 Kafka) 以促進訊息緩衝,現在可以使用持久佇列功能。 --- input→persistent queue→filter→output 設定在pipeline如下: ``` queue.type: persisted queue.max_bytes: 4gb ``` --- ## Dead letter queue 當Logstash遇到資料映射錯誤或其他問題而無法處理的事件時,Logstash管道會掛起或丟棄不成功的事件。為了防止在這種情況下丟失資料,配置Logstash將不成功的事件寫入dead letter queue而不是丟棄。 --- 默認情況下是禁用,要使用需編輯 /etc/logstash/logstash.yml ``` dead_letter_queue.enable: true path.dead_letter_queue: "path/to/data/dead_letter_queue" #設定資料存放位置 ``` --- 重新處理失敗的資料 可以再把失敗的資料接起來處理,使用dead_letter_queue套件即可 ``` input { dead_letter_queue { path => "/path/to/data/dead_letter_queue" #dead_letter_queue設定的位置 commit_offsets => true #保存偏移量。管道重新啟動時,它將繼續從中斷的位置開始讀取,而不是重新處理隊列中的所有項目 pipeline_id => "main" } } ``` --- # 敬請指教 ----
{"metaMigratedAt":"2023-06-16T00:31:32.961Z","metaMigratedFrom":"Content","title":"Logstash","breaks":true,"contributors":"[{\"id\":\"565617f1-fa63-410c-9fb9-a9278632bb7a\",\"add\":10345,\"del\":2141}]"}
    710 views