# 設置和管理 Elasticsearch ILM 策略,Logstash 配置以及測試 ## 安裝Elasticsearch 和 logstash ,part1 ### 1. 安裝 install elasticsearch ```shell wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.14.3-amd64.deb sudo dpkg -i elasticsearch-8.3.3-amd64.deb ``` 需要記得密碼,如果忘記,可以用下面指令重新產生密碼: ``` sudo /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic ``` install logstash ```shell= sudo wget https://artifacts.elastic.co/downloads/logstash/logstash-8.14.1-amd64.deb sudo dpkg -i logstash-8.14.1-amd64.deb ``` ### 2. 配置 ```shell sudo vim /etc/elasticsearch/elasticsearch.yml ``` 暫時不需要修改。 ### 3. 啟動 Elasticsearch ```shell sudo systemctl start elasticsearch sudo systemctl enable elasticsearch ``` 檢查 Elasticsearch 是否正常運行: ```shell= curl -kX GET -u elastic:${pswd} "https://localhost:9200/" ``` 應該會返回類似以下的 JSON 響應: ```json { "name" : "node-1", "cluster_name" : "elk-cluster", "cluster_uuid" : "xxxxxxxxxxxxxx", "version" : { "number" : "7.17.2", "build_flavor" : "default", "build_type" : "deb", "build_hash" : "xxxxxxxxxxxxxx", "build_date" : "2022-xx-xxTxx:xx:xx.000Z", "qualifier" : "final" }, "tagline" : "You Know, for Search" } ``` --- ## 更改 ILM 策略 ### 1. 定義 ILM 策略 設置索引在達到 25GB 或 180 天後刪除: ```bash curl -X PUT -u elastic:${pswd} "https://localhost:9200/_ilm/policy/log-retention-policy" -H 'Content-Type: application/json' -d' { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_size": "25gb" } } }, "delete": { "min_age": "180d", "actions": { "delete": {} } } } } } ' ``` ### 2. 創建索引模板 創建索引模板來應用 ILM 策略到新索引: ```bash curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/log-template" -H 'Content-Type: application/json' -d' { "index_patterns": ["log-*"], "settings": { "index.lifecycle.name": "log-retention-policy", "index.lifecycle.rollover_alias": "log-alias" } } ' ``` ### 3. 創建初始索引 創建初始索引並設置別名: ```bash curl -kX PUT -u elastic:${pswd} "https://localhost:9200/log-000001" -H 'Content-Type: application/json' -d' { "aliases": { "log-alias": { "is_write_index": true } } } ' ``` ### 4. 驗證 ILM 策略 檢查索引的 ILM 狀態: ```bash curl -kX GET -u elastic:${pswd} "https://localhost:9200/log-000001/_ilm/explain?pretty" ``` --- ## 配置 Logstash 和 Elasticsearch ### 1. 配置 Elasticsearch 節點 確保每個節點的 Elasticsearch 配置文件 `/etc/elasticsearch/elasticsearch.yml` 如下: ```yaml cluster.name: elk-cluster node.name: <unique-node-name> node.master: true node.data: true network.host: 0.0.0.0 http.port: 9200 discovery.seed_hosts: ["10.1.1.20", "10.1.1.21"] cluster.initial_master_nodes: ["node-1", "node-2"] ``` 重啟 Elasticsearch 服務: ```bash sudo systemctl restart elasticsearch ``` ### 2. 配置 Logstash 節點 Logstash 配置 `/etc/logstash/conf.d/logstash.conf` : ```shell= input { beats { port => 5044 } } output { elasticsearch { hosts => ["http://10.1.1.20:9200", "http://10.1.1.21:9200"] index => "log-%{+YYYY.MM.dd}" ilm_enabled => true ilm_rollover_alias => "log-alias" ilm_policy => "log-retention-policy" } stdout { codec => rubydebug } } ``` 重啟 Logstash 服務: ```bash sudo systemctl restart logstash ``` ### 3. 配置 Logstash效能設置 ```bash= sudo vim /etc/logstash/logstash.yml ``` ```bash= # logstash.yml pipeline.workers: 4 # 設置工作者數量,默認為 CPU 核心數 pipeline.batch.size: 125 # 設置每個批次的事件數量,默認為 125 pipeline.batch.delay: 50 # 設置批次延遲(毫秒),默認為 50ms ``` ```bash= sudo vim /etc/logstash/pipelines.yml ``` ```bash= # pipelines.yml - pipeline.id: main path.config: "/etc/logstash/conf.d/*.conf" pipeline.workers: 4 # 設置工作者數量 pipeline.batch.size: 125 # 設置每個批次的事件數量 pipeline.batch.delay: 50 # 設置批次延遲(毫秒) ``` ```bash= sudo vim /etc/logstash/jvm.options ``` ```bash= # jvm.options -Xms4g # 設置最小堆內存為 4GB -Xmx4g # 設置最大堆內存為 4GB ``` 重啟並驗證設置 ```bash= sudo systemctl restart logstash ``` ```bash= sudo tail -f /var/log/logstash/logstash-plain.log ``` --- ## 插入和查詢數據 ### 1. 插入測試數據 ```bash for i in {1..100}; do curl -X POST "http://10.1.1.22:5044" -H 'Content-Type: application/json' -d "{\"message\": \"This is test log entry $i\"}" done ``` ### 1.1 效率 ```python= import time import requests from concurrent.futures import ThreadPoolExecutor, as_completed LOGSTASH_SERVER = "http://10.1.1.22:5044" NUM_REQUESTS_PER_SECOND = 800 def send_log(entry_number): data = {"message": f"This is test log entry {entry_number}"} start_time = time.time() try: requests.post(LOGSTASH_SERVER, json=data) except requests.exceptions.RequestException as e: pass end_time = time.time() return end_time - start_time def test_logstash_load(num_requests): total_time = 0 response_times = [] with ThreadPoolExecutor(max_workers=num_requests) as executor: futures = [executor.submit(send_log, i) for i in range(num_requests)] for future in as_completed(futures): response_time = future.result() response_times.append(response_time) total_time += response_time average_time = total_time / num_requests print(f"Number of requests: {num_requests}") print(f"Average response time: {average_time:.4f} seconds") print() if __name__ == "__main__": while True: print(f"Testing with {NUM_REQUESTS_PER_SECOND} requests per second") start_time = time.time() test_logstash_load(NUM_REQUESTS_PER_SECOND) end_time = time.time() print(f"Total duration for {NUM_REQUESTS_PER_SECOND} requests: {end_time - start_time:.2f} seconds") time.sleep(1) ``` 單台插入效能大約為 500/sec, 機器為1core 8G ### 2. 查詢插入的數據 ```bash curl -X GET "http://10.1.1.20:9200/log-*/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match_all": {} }, "size": 1000 }' ``` #### 2.1 效率 ```python= import time import requests from concurrent.futures import ThreadPoolExecutor, as_completed ELASTICSEARCH_SERVER = "http://10.1.1.27:80" # 使用 nginx 代理的 IP 和端口 NUM_REQUESTS_PER_SECOND = 300 QUERY = { "query": { "match_all": {} }, "size": 100 # 每次查詢返回的文檔數量 } def perform_query(query_number): start_time = time.time() try: response = requests.get(f"{ELASTICSEARCH_SERVER}/log-*/_search", json=QUERY) response.raise_for_status() except requests.exceptions.RequestException as e: print(f"Error during query {query_number}: {e}") end_time = time.time() return end_time - start_time def test_elasticsearch_query_load(num_requests): total_time = 0 response_times = [] with ThreadPoolExecutor(max_workers=num_requests) as executor: futures = [executor.submit(perform_query, i) for i in range(num_requests)] for future in as_completed(futures): response_time = future.result() response_times.append(response_time) total_time += response_time average_time = total_time / num_requests print(f"Number of requests: {num_requests}") print(f"Average response time: {average_time:.4f} seconds") print(f"Total duration for {num_requests} requests: {total_time:.2f} seconds") if __name__ == "__main__": while True: print(f"Testing with {NUM_REQUESTS_PER_SECOND} requests per second") start_time = time.time() test_elasticsearch_query_load(NUM_REQUESTS_PER_SECOND) end_time = time.time() time.sleep(max(0, 1 - (end_time - start_time))) # 確保每秒執行一次 ``` 對兩台1core 8G的 Elasticsearch 效能為 300次/sec, 每次查詢100筆資料 ## log傳輸至logstash ### 1. install Filebeat ```bash= wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add - sudo sh -c 'echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" > /etc/apt/sources.list.d/elastic-7.x.list' sudo apt update sudo apt install filebeat ``` ### 2. 設置 filebeat ```bash= sudo vim /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /var/log/*.log output.logstash: hosts: ["10.1.1.22:5044", "10.1.1.23:5044", "10.1.1.24:5044", "10.1.1.25:5044"] ``` filebeat.inputs:定義要收集的日誌文件路徑。 output.logstash:指定 Logstash 節點的地址。 #### enable and restart ```bash= sudo filebeat modules enable system sudo systemctl start filebeat sudo systemctl enable filebeat # confirm setting sudo tail -f /var/log/filebeat/filebeat.log ``` #### testing ```shell= for i in {1..100}; do echo "Test log entry $i" >> /var/log/test.log done ``` #### check elasticsearch data ```bash= curls -kX GET -u elastic:${pswd} "https://localhost:9200/log-*/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match_all": {} }, "size": 1000 }' ``` ## 將不同種類log傳送至指定indices - 改以 rsyslog 改為判斷log內容並根據內容送到指定的logstash上面 ,每台logstash都會要指定送到哪個indices上面 ### install and setup rsyslog, 根據log自動選擇logstash ```bash= sudo apt-get install rsyslog sudo apt-get install rsyslog-elasticsearch ``` ```shell= sudo vim /etc/rsyslog.d/50-elasticsearch.conf module(load="omfwd") template(name="LogstashFormat" type="string" string="<%PRI%>%TIMESTAMP:::date-rfc3339% %HOSTNAME% %syslogtag%%msg%\n") if $msg contains "specific" then { action( type="omfwd" target="10.1.1.22" port="5044" protocol="tcp" template="LogstashFormat" ) } else { action( type="omfwd" target="10.1.1.23" port="5044" protocol="tcp" template="LogstashFormat" ) } ``` - restart rsyslog ```bash= sudo systemctl restart rsyslog ``` ### setup logstash ```bash= sudo vim /etc/logstash/conf.d/logstash.conf ``` - for logstash (specific-log) ```shell= input { tcp { port => 5044 type => "syslog" } } output { elasticsearch { hosts => ["http://10.1.1.20:9200"] index => "specific-log-index-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } } ``` - for default-log ```bash= sudo vim /etc/logstash/conf.d/logstash.conf ``` ```shell= input { tcp { port => 5044 type => "syslog" } } output { elasticsearch { hosts => ["http://10.1.1.21:9200"] index => "default-log-index-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } } ``` - restart logstash ```bash= sudo systemctl restart logstash ``` - testing ```bash= logger "This is a test message for specific index" logger "This is a test message for default index" ``` - 確認indices分配狀態 ```shell= curl -X GET "http://10.1.1.20:9200/specific-log-index-*/_search?pretty" curl -X GET "http://10.1.1.21:9200/default-log-index-*/_search?pretty" ``` - create indices template ```shell= curl -X PUT "http://10.1.1.21:9200/_template/log-template" -H 'Content-Type: application/json' -d' { "index_patterns": ["*-log-index-*"], "settings": { "index.lifecycle.name": "log-retention-policy", "index.lifecycle.rollover_alias": "log-alias" } } ' ``` - setup indices ```bash= # 確認不衝突 curl -X DELETE "http://10.1.1.21:9200/specific-log-index" curl -X DELETE "http://10.1.1.21:9200/default-log-index" # 創建 curl -X PUT "http://10.1.1.21:9200/specific-log-index-000001" -H 'Content-Type: application/json' -d' { "aliases": { "specific-log-index": { "is_write_index": true } } } ' curl -X PUT "http://10.1.1.21:9200/default-log-index-000001" -H 'Content-Type: application/json' -d' { "aliases": { "default-log-index": { "is_write_index": true } } } ' ``` - modify ILM strategy ```bash= curl -X PUT "http://10.1.1.21:9200/_template/log-template" -H 'Content-Type: application/json' -d' { "index_patterns": ["*-log-alias-*"], "settings": { "index.lifecycle.name": "log-retention-policy", "index.lifecycle.rollover_alias": "log-alias" } } ' ``` - confirm setup succeed (with rsyslog) ```bash= logger "This is a test message for specific index" logger "This is a test message for default index" ``` - check in elasticSearch ```bash= curl -X GET "http://10.1.1.20:9200/specific-log-index/_search?pretty" curl -X GET "http://10.1.1.20:9200/default-log-index/_search?pretty" #查看indices curl -X GET "http://10.1.1.20:9200/_cat/indices?h=index" curl -X GET "http://10.1.1.20:9200/_settings?pretty" curl -X GET "http://10.1.1.20:9200/_mapping?pretty" ``` ### kibana ```python= from flask import Flask, request, jsonify import requests import json app = Flask(__name__) KIBANA_URL = "http://localhost:5601" # 修改为你的 Kibana 实例地址 HEADERS = {"kbn-xsrf": "true", "Content-Type": "application/json"} @app.route('/create_index_patterns_and_visualizations', methods=['POST']) def create_index_patterns_and_visualizations(): data = request.json index_patterns = data.get('index_patterns') if not index_patterns: return jsonify({"error": "index_patterns is required"}), 400 results = [] for pattern in index_patterns: index_pattern = pattern.get('index_pattern') time_field = pattern.get('time_field', '@timestamp') if not index_pattern: results.append({"index_pattern": index_pattern, "error": "index_pattern is required"}) continue # 创建 Index Pattern index_pattern_payload = { "attributes": { "title": index_pattern, "timeFieldName": time_field } } response = requests.post(f"{KIBANA_URL}/api/saved_objects/index-pattern", headers=HEADERS, json=index_pattern_payload) if response.status_code != 200: results.append({"index_pattern": index_pattern, "error": "Failed to create index pattern", "details": response.json()}) continue # 获取 Index Pattern ID index_pattern_id = response.json()['id'] # 创建 Visualization visualization_payload = { "attributes": { "title": f"Visualization for {index_pattern}", "visState": json.dumps({ "title": f"Visualization for {index_pattern}", "type": "line", "params": { "type": "line", "grid": { "categoryLines": False, "style": { "color": "#eee" } } }, "aggs": [ { "id": "1", "enabled": True, "type": "count", "schema": "metric", "params": {} }, { "id": "2", "enabled": True, "type": "date_histogram", "schema": "segment", "params": { "field": time_field, "interval": "auto", "min_doc_count": 1, "extended_bounds": {} } } ], "listeners": {} }), "uiStateJSON": "{}", "description": "", "version": 1, "kibanaSavedObjectMeta": { "searchSourceJSON": json.dumps({ "index": index_pattern_id, "query": { "query": "", "language": "kuery" }, "filter": [] }) } } } response = requests.post(f"{KIBANA_URL}/api/saved_objects/visualization", headers=HEADERS, json=visualization_payload) if response.status_code != 200: results.append({"index_pattern": index_pattern, "error": "Failed to create visualization", "details": response.json()}) continue results.append({"index_pattern": index_pattern, "message": "Index pattern and visualization created successfully"}) return jsonify(results), 200 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000) ``` --- ## Alert v1 ```python3= import requests import json import schedule import time from datetime import datetime def send_alert(log): print("ALERT: Detected 500 error log!") print(json.dumps(log, indent=4)) def query_elasticsearch(): url = "http://10.1.1.20:9200/logstash-*/_search?pretty" headers = { 'Content-Type': 'application/json' } query = { "query": { "range": { "@timestamp": { "gte": "now-30s" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] } response = requests.get(url, headers=headers, data=json.dumps(query)) if response.status_code == 200: results = response.json() hits = results.get('hits', {}).get('hits', []) if hits: print(f"Logs from the last minute ({datetime.now()}):") for hit in hits: log = hit["_source"] if "500" in log.get("message", ""): send_alert(log) else: print(f"No logs found in the last minute ({datetime.now()}).") else: print(f"Failed to fetch logs: {response.status_code} - {response.text}") # Schedule the task to run every minute schedule.every(30).seconds.do(query_elasticsearch) print("Starting log query script. Press Ctrl+C to exit.") # Run the scheduler try: while True: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: print("Script terminated by user.") ``` ## DDOS Alert v1 ```py import schedule import time import requests from datetime import datetime, timedelta ELASTICSEARCH_URL = "http://10.1.1.20:9200" TIME_FIELD = "@timestamp" HEADERS = {"Content-Type": "application/json"} history = {} counter = {} rate = 2 # 一個倍率,根據實際狀況進行調整 def send_alert(log): print("ALERT: Detected DDOS", log) # 回傳過去一小時的訪問次數 def get_logs_count_last_hour(): now = datetime.utcnow() one_hour_ago = now - timedelta(hours=1) query = { "query": { "bool": { "must": [ { "range": { TIME_FIELD: { "gte": one_hour_ago.isoformat(), "lte": now.isoformat(), "format": "strict_date_optional_time" } } } ] } } } # Elasticsearch 的查詢語法 response = requests.post(f"{ELASTICSEARCH_URL}/logstash-*/_count", headers=HEADERS, json=query) if response.status_code != 200: print(f"Failed to query logs count at {now.isoformat()}: {response.json()}") return count = response.json().get('count', 0) return count def job(): """ 取得過去一小時的訪問次數,然後判斷是否超過該時間段的平均值, 如果超過,就發送警告。 以及將這筆資料記錄下來 """ c = get_logs_count_last_hour() h = datetime.now().hour s = history[h] // counter[c] if c >= s * rate: send_alert(c // s) history[h] += c counter[h] += 1 # Schedule job to run at the top of every hour schedule.every().hour.at(":00").do(job) if __name__ == '__main__': while True: schedule.run_pending() time.sleep(1) ``` ## Line Bot 先申請一個 line bot:https://ithelp.ithome.com.tw/m/articles/10215268。 ```python import json import requests from flask import Flask, request, jsonify ChannelAccessToken = '' # Place your access token here. USER_IDS_FILE = 'user_ids.txt' app = Flask(__name__) @app.route('/', methods=['GET', 'POST']) def index(): return 'Hello, World!', 200 @app.route('/webhook', methods=['POST']) def webhook(): data = request.get_json() for event in data['events']: userId = event['source']['userId'] userMessage = event['message']['text'] if userMessage == 'abcd1234': # Place a password here. print(f"Received special message from user {userId}: {userMessage}") with open(USER_IDS_FILE, 'a') as f: f.write(f"{userId}\n") print(f"Recorded userId: {userId}") return 'OK', 200 @app.route('/send_alert', methods=['POST']) def send_alert(): data = request.get_json() if 'message' not in data: return jsonify({"error": "Missing 'message' in request body"}), 400 alert_message = data['message'] try: with open(USER_IDS_FILE, 'r') as f: user_ids = f.readlines() user_ids = [uid.strip() for uid in user_ids if uid.strip()] if not user_ids: return jsonify({"error": "No user IDs found"}), 404 for userId in user_ids: payload = { "to": userId, "messages": [ { "type": "text", "text": alert_message } ] } headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {ChannelAccessToken}' } response = requests.post('https://api.line.me/v2/bot/message/push', headers=headers, data=json.dumps(payload)) if response.status_code != 200: print(f"Failed to send message to {userId}: {response.text}") return jsonify({"status": "Alert sent to all users"}), 200 except FileNotFoundError: return jsonify({"error": "User ID file not found"}), 404 if __name__ == '__main__': app.run(host='0.0.0.0', port=80, debug=True) ``` 使用者發送密碼給 line bot,bot 就會記錄使用者 id,當 alert 發送時,會發送給所有被記錄的使用者。 ``` import requests import json def send_alert(message): api_url = "http://your-server-ip/send_alert" payload = { "message": message } headers = { "Content-Type": "application/json" } try: response = requests.post(api_url, headers=headers, data=json.dumps(payload)) print(f"Status Code: {response.status_code}") print(f"Response: {response.text}") except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") ``` ## Alert v2 ```python3= import requests import json import schedule import time from datetime import datetime def send_alert(log): print("ALERT: Detected 500 error log!") print(json.dumps(log, indent=4)) def query_elasticsearch(): url = "http://10.1.1.20:9200/logstash-*/_search?pretty" headers = { 'Content-Type': 'application/json' } query = { "query": { "range": { "@timestamp": { "gte": "now-30s" } } }, "sort": [ { "@timestamp": { "order": "desc" } } ] } response = requests.get(url, headers=headers, data=json.dumps(query)) if response.status_code == 200: results = response.json() hits = results.get('hits', {}).get('hits', []) if hits: print(f"Logs from the last minute ({datetime.now()}):") for hit in hits: log = hit["_source"] send_alert(log) else: print(f"No logs found in the last minute ({datetime.now()}).") else: print(f"Failed to fetch logs: {response.status_code} - {response.text}") # Schedule the task to run every minute schedule.every(30).seconds.do(query_elasticsearch) print("Starting log query script. Press Ctrl+C to exit.") # Run the scheduler try: while True: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: print("Script terminated by user.") ``` --- ### 設定 Kibana Rules 點擊左邊 Menu: ![image](https://hackmd.io/_uploads/BkOpNovCR.png) 選擇 Stack Management: ![image](https://hackmd.io/_uploads/BkT2VsDCA.png) 選擇 Rules and Insights: ![image](https://hackmd.io/_uploads/SJKgBjP0R.png) Create Rule: ![image](https://hackmd.io/_uploads/r11MrowCR.png) 選擇下面的 Elasticsearch Query: ![image](https://hackmd.io/_uploads/SkTXHiD00.png) 就可以編輯 query 語法了: ![image](https://hackmd.io/_uploads/SkYOHsDAR.png) 具體可以參考 [Doc](https://www.elastic.co/guide/en/kibana/current/rule-type-es-query.html)。