# 設置和管理 Elasticsearch ILM 策略,Logstash 配置以及測試
## 安裝Elasticsearch 和 logstash ,part1
### 1. 安裝
install elasticsearch
```shell
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.14.3-amd64.deb
sudo dpkg -i elasticsearch-8.3.3-amd64.deb
```
需要記得密碼,如果忘記,可以用下面指令重新產生密碼:
```
sudo /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic
```
install logstash
```shell=
sudo wget https://artifacts.elastic.co/downloads/logstash/logstash-8.14.1-amd64.deb
sudo dpkg -i logstash-8.14.1-amd64.deb
```
### 2. 配置
```shell
sudo vim /etc/elasticsearch/elasticsearch.yml
```
暫時不需要修改。
### 3. 啟動 Elasticsearch
```shell
sudo systemctl start elasticsearch
sudo systemctl enable elasticsearch
```
檢查 Elasticsearch 是否正常運行:
```shell=
curl -kX GET -u elastic:${pswd} "https://localhost:9200/"
```
應該會返回類似以下的 JSON 響應:
```json
{
"name" : "node-1",
"cluster_name" : "elk-cluster",
"cluster_uuid" : "xxxxxxxxxxxxxx",
"version" : {
"number" : "7.17.2",
"build_flavor" : "default",
"build_type" : "deb",
"build_hash" : "xxxxxxxxxxxxxx",
"build_date" : "2022-xx-xxTxx:xx:xx.000Z",
"qualifier" : "final"
},
"tagline" : "You Know, for Search"
}
```
---
## 更改 ILM 策略
### 1. 定義 ILM 策略
設置索引在達到 25GB 或 180 天後刪除:
```bash
curl -X PUT -u elastic:${pswd} "https://localhost:9200/_ilm/policy/log-retention-policy" -H 'Content-Type: application/json' -d'
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "25gb"
}
}
},
"delete": {
"min_age": "180d",
"actions": {
"delete": {}
}
}
}
}
}
'
```
### 2. 創建索引模板
創建索引模板來應用 ILM 策略到新索引:
```bash
curl -kX PUT -u elastic:${pswd} "https://localhost:9200/_template/log-template" -H 'Content-Type: application/json' -d'
{
"index_patterns": ["log-*"],
"settings": {
"index.lifecycle.name": "log-retention-policy",
"index.lifecycle.rollover_alias": "log-alias"
}
}
'
```
### 3. 創建初始索引
創建初始索引並設置別名:
```bash
curl -kX PUT -u elastic:${pswd} "https://localhost:9200/log-000001" -H 'Content-Type: application/json' -d'
{
"aliases": {
"log-alias": {
"is_write_index": true
}
}
}
'
```
### 4. 驗證 ILM 策略
檢查索引的 ILM 狀態:
```bash
curl -kX GET -u elastic:${pswd} "https://localhost:9200/log-000001/_ilm/explain?pretty"
```
---
## 配置 Logstash 和 Elasticsearch
### 1. 配置 Elasticsearch 節點
確保每個節點的 Elasticsearch 配置文件 `/etc/elasticsearch/elasticsearch.yml` 如下:
```yaml
cluster.name: elk-cluster
node.name: <unique-node-name>
node.master: true
node.data: true
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["10.1.1.20", "10.1.1.21"]
cluster.initial_master_nodes: ["node-1", "node-2"]
```
重啟 Elasticsearch 服務:
```bash
sudo systemctl restart elasticsearch
```
### 2. 配置 Logstash 節點
Logstash 配置 `/etc/logstash/conf.d/logstash.conf` :
```shell=
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://10.1.1.20:9200", "http://10.1.1.21:9200"]
index => "log-%{+YYYY.MM.dd}"
ilm_enabled => true
ilm_rollover_alias => "log-alias"
ilm_policy => "log-retention-policy"
}
stdout { codec => rubydebug }
}
```
重啟 Logstash 服務:
```bash
sudo systemctl restart logstash
```
### 3. 配置 Logstash效能設置
```bash=
sudo vim /etc/logstash/logstash.yml
```
```bash=
# logstash.yml
pipeline.workers: 4 # 設置工作者數量,默認為 CPU 核心數
pipeline.batch.size: 125 # 設置每個批次的事件數量,默認為 125
pipeline.batch.delay: 50 # 設置批次延遲(毫秒),默認為 50ms
```
```bash=
sudo vim /etc/logstash/pipelines.yml
```
```bash=
# pipelines.yml
- pipeline.id: main
path.config: "/etc/logstash/conf.d/*.conf"
pipeline.workers: 4 # 設置工作者數量
pipeline.batch.size: 125 # 設置每個批次的事件數量
pipeline.batch.delay: 50 # 設置批次延遲(毫秒)
```
```bash=
sudo vim /etc/logstash/jvm.options
```
```bash=
# jvm.options
-Xms4g # 設置最小堆內存為 4GB
-Xmx4g # 設置最大堆內存為 4GB
```
重啟並驗證設置
```bash=
sudo systemctl restart logstash
```
```bash=
sudo tail -f /var/log/logstash/logstash-plain.log
```
---
## 插入和查詢數據
### 1. 插入測試數據
```bash
for i in {1..100}; do
curl -X POST "http://10.1.1.22:5044" -H 'Content-Type: application/json' -d "{\"message\": \"This is test log entry $i\"}"
done
```
### 1.1 效率
```python=
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
LOGSTASH_SERVER = "http://10.1.1.22:5044"
NUM_REQUESTS_PER_SECOND = 800
def send_log(entry_number):
data = {"message": f"This is test log entry {entry_number}"}
start_time = time.time()
try:
requests.post(LOGSTASH_SERVER, json=data)
except requests.exceptions.RequestException as e:
pass
end_time = time.time()
return end_time - start_time
def test_logstash_load(num_requests):
total_time = 0
response_times = []
with ThreadPoolExecutor(max_workers=num_requests) as executor:
futures = [executor.submit(send_log, i) for i in range(num_requests)]
for future in as_completed(futures):
response_time = future.result()
response_times.append(response_time)
total_time += response_time
average_time = total_time / num_requests
print(f"Number of requests: {num_requests}")
print(f"Average response time: {average_time:.4f} seconds")
print()
if __name__ == "__main__":
while True:
print(f"Testing with {NUM_REQUESTS_PER_SECOND} requests per second")
start_time = time.time()
test_logstash_load(NUM_REQUESTS_PER_SECOND)
end_time = time.time()
print(f"Total duration for {NUM_REQUESTS_PER_SECOND} requests: {end_time - start_time:.2f} seconds")
time.sleep(1)
```
單台插入效能大約為 500/sec, 機器為1core 8G
### 2. 查詢插入的數據
```bash
curl -X GET "http://10.1.1.20:9200/log-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match_all": {}
},
"size": 1000
}'
```
#### 2.1 效率
```python=
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
ELASTICSEARCH_SERVER = "http://10.1.1.27:80" # 使用 nginx 代理的 IP 和端口
NUM_REQUESTS_PER_SECOND = 300
QUERY = {
"query": {
"match_all": {}
},
"size": 100 # 每次查詢返回的文檔數量
}
def perform_query(query_number):
start_time = time.time()
try:
response = requests.get(f"{ELASTICSEARCH_SERVER}/log-*/_search", json=QUERY)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Error during query {query_number}: {e}")
end_time = time.time()
return end_time - start_time
def test_elasticsearch_query_load(num_requests):
total_time = 0
response_times = []
with ThreadPoolExecutor(max_workers=num_requests) as executor:
futures = [executor.submit(perform_query, i) for i in range(num_requests)]
for future in as_completed(futures):
response_time = future.result()
response_times.append(response_time)
total_time += response_time
average_time = total_time / num_requests
print(f"Number of requests: {num_requests}")
print(f"Average response time: {average_time:.4f} seconds")
print(f"Total duration for {num_requests} requests: {total_time:.2f} seconds")
if __name__ == "__main__":
while True:
print(f"Testing with {NUM_REQUESTS_PER_SECOND} requests per second")
start_time = time.time()
test_elasticsearch_query_load(NUM_REQUESTS_PER_SECOND)
end_time = time.time()
time.sleep(max(0, 1 - (end_time - start_time))) # 確保每秒執行一次
```
對兩台1core 8G的 Elasticsearch 效能為 300次/sec, 每次查詢100筆資料
## log傳輸至logstash
### 1. install Filebeat
```bash=
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo sh -c 'echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" > /etc/apt/sources.list.d/elastic-7.x.list'
sudo apt update
sudo apt install filebeat
```
### 2. 設置 filebeat
```bash=
sudo vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/*.log
output.logstash:
hosts: ["10.1.1.22:5044", "10.1.1.23:5044", "10.1.1.24:5044", "10.1.1.25:5044"]
```
filebeat.inputs:定義要收集的日誌文件路徑。
output.logstash:指定 Logstash 節點的地址。
#### enable and restart
```bash=
sudo filebeat modules enable system
sudo systemctl start filebeat
sudo systemctl enable filebeat
# confirm setting
sudo tail -f /var/log/filebeat/filebeat.log
```
#### testing
```shell=
for i in {1..100}; do
echo "Test log entry $i" >> /var/log/test.log
done
```
#### check elasticsearch data
```bash=
curls -kX GET -u elastic:${pswd} "https://localhost:9200/log-*/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match_all": {}
},
"size": 1000
}'
```
## 將不同種類log傳送至指定indices
- 改以 rsyslog 改為判斷log內容並根據內容送到指定的logstash上面 ,每台logstash都會要指定送到哪個indices上面
### install and setup rsyslog, 根據log自動選擇logstash
```bash=
sudo apt-get install rsyslog
sudo apt-get install rsyslog-elasticsearch
```
```shell=
sudo vim /etc/rsyslog.d/50-elasticsearch.conf
module(load="omfwd")
template(name="LogstashFormat" type="string" string="<%PRI%>%TIMESTAMP:::date-rfc3339% %HOSTNAME% %syslogtag%%msg%\n")
if $msg contains "specific" then {
action(
type="omfwd"
target="10.1.1.22"
port="5044"
protocol="tcp"
template="LogstashFormat"
)
} else {
action(
type="omfwd"
target="10.1.1.23"
port="5044"
protocol="tcp"
template="LogstashFormat"
)
}
```
- restart rsyslog
```bash=
sudo systemctl restart rsyslog
```
### setup logstash
```bash=
sudo vim /etc/logstash/conf.d/logstash.conf
```
- for logstash (specific-log)
```shell=
input {
tcp {
port => 5044
type => "syslog"
}
}
output {
elasticsearch {
hosts => ["http://10.1.1.20:9200"]
index => "specific-log-index-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
```
- for default-log
```bash=
sudo vim /etc/logstash/conf.d/logstash.conf
```
```shell=
input {
tcp {
port => 5044
type => "syslog"
}
}
output {
elasticsearch {
hosts => ["http://10.1.1.21:9200"]
index => "default-log-index-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
```
- restart logstash
```bash=
sudo systemctl restart logstash
```
- testing
```bash=
logger "This is a test message for specific index"
logger "This is a test message for default index"
```
- 確認indices分配狀態
```shell=
curl -X GET "http://10.1.1.20:9200/specific-log-index-*/_search?pretty"
curl -X GET "http://10.1.1.21:9200/default-log-index-*/_search?pretty"
```
- create indices template
```shell=
curl -X PUT "http://10.1.1.21:9200/_template/log-template" -H 'Content-Type: application/json' -d'
{
"index_patterns": ["*-log-index-*"],
"settings": {
"index.lifecycle.name": "log-retention-policy",
"index.lifecycle.rollover_alias": "log-alias"
}
}
'
```
- setup indices
```bash=
# 確認不衝突
curl -X DELETE "http://10.1.1.21:9200/specific-log-index"
curl -X DELETE "http://10.1.1.21:9200/default-log-index"
# 創建
curl -X PUT "http://10.1.1.21:9200/specific-log-index-000001" -H 'Content-Type: application/json' -d'
{
"aliases": {
"specific-log-index": {
"is_write_index": true
}
}
}
'
curl -X PUT "http://10.1.1.21:9200/default-log-index-000001" -H 'Content-Type: application/json' -d'
{
"aliases": {
"default-log-index": {
"is_write_index": true
}
}
}
'
```
- modify ILM strategy
```bash=
curl -X PUT "http://10.1.1.21:9200/_template/log-template" -H 'Content-Type: application/json' -d'
{
"index_patterns": ["*-log-alias-*"],
"settings": {
"index.lifecycle.name": "log-retention-policy",
"index.lifecycle.rollover_alias": "log-alias"
}
}
'
```
- confirm setup succeed (with rsyslog)
```bash=
logger "This is a test message for specific index"
logger "This is a test message for default index"
```
- check in elasticSearch
```bash=
curl -X GET "http://10.1.1.20:9200/specific-log-index/_search?pretty"
curl -X GET "http://10.1.1.20:9200/default-log-index/_search?pretty"
#查看indices
curl -X GET "http://10.1.1.20:9200/_cat/indices?h=index"
curl -X GET "http://10.1.1.20:9200/_settings?pretty"
curl -X GET "http://10.1.1.20:9200/_mapping?pretty"
```
### kibana
```python=
from flask import Flask, request, jsonify
import requests
import json
app = Flask(__name__)
KIBANA_URL = "http://localhost:5601" # 修改为你的 Kibana 实例地址
HEADERS = {"kbn-xsrf": "true", "Content-Type": "application/json"}
@app.route('/create_index_patterns_and_visualizations', methods=['POST'])
def create_index_patterns_and_visualizations():
data = request.json
index_patterns = data.get('index_patterns')
if not index_patterns:
return jsonify({"error": "index_patterns is required"}), 400
results = []
for pattern in index_patterns:
index_pattern = pattern.get('index_pattern')
time_field = pattern.get('time_field', '@timestamp')
if not index_pattern:
results.append({"index_pattern": index_pattern, "error": "index_pattern is required"})
continue
# 创建 Index Pattern
index_pattern_payload = {
"attributes": {
"title": index_pattern,
"timeFieldName": time_field
}
}
response = requests.post(f"{KIBANA_URL}/api/saved_objects/index-pattern", headers=HEADERS, json=index_pattern_payload)
if response.status_code != 200:
results.append({"index_pattern": index_pattern, "error": "Failed to create index pattern", "details": response.json()})
continue
# 获取 Index Pattern ID
index_pattern_id = response.json()['id']
# 创建 Visualization
visualization_payload = {
"attributes": {
"title": f"Visualization for {index_pattern}",
"visState": json.dumps({
"title": f"Visualization for {index_pattern}",
"type": "line",
"params": {
"type": "line",
"grid": {
"categoryLines": False,
"style": {
"color": "#eee"
}
}
},
"aggs": [
{
"id": "1",
"enabled": True,
"type": "count",
"schema": "metric",
"params": {}
},
{
"id": "2",
"enabled": True,
"type": "date_histogram",
"schema": "segment",
"params": {
"field": time_field,
"interval": "auto",
"min_doc_count": 1,
"extended_bounds": {}
}
}
],
"listeners": {}
}),
"uiStateJSON": "{}",
"description": "",
"version": 1,
"kibanaSavedObjectMeta": {
"searchSourceJSON": json.dumps({
"index": index_pattern_id,
"query": {
"query": "",
"language": "kuery"
},
"filter": []
})
}
}
}
response = requests.post(f"{KIBANA_URL}/api/saved_objects/visualization", headers=HEADERS, json=visualization_payload)
if response.status_code != 200:
results.append({"index_pattern": index_pattern, "error": "Failed to create visualization", "details": response.json()})
continue
results.append({"index_pattern": index_pattern, "message": "Index pattern and visualization created successfully"})
return jsonify(results), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
```
---
## Alert v1
```python3=
import requests
import json
import schedule
import time
from datetime import datetime
def send_alert(log):
print("ALERT: Detected 500 error log!")
print(json.dumps(log, indent=4))
def query_elasticsearch():
url = "http://10.1.1.20:9200/logstash-*/_search?pretty"
headers = {
'Content-Type': 'application/json'
}
query = {
"query": {
"range": {
"@timestamp": {
"gte": "now-30s"
}
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
response = requests.get(url, headers=headers, data=json.dumps(query))
if response.status_code == 200:
results = response.json()
hits = results.get('hits', {}).get('hits', [])
if hits:
print(f"Logs from the last minute ({datetime.now()}):")
for hit in hits:
log = hit["_source"]
if "500" in log.get("message", ""):
send_alert(log)
else:
print(f"No logs found in the last minute ({datetime.now()}).")
else:
print(f"Failed to fetch logs: {response.status_code} - {response.text}")
# Schedule the task to run every minute
schedule.every(30).seconds.do(query_elasticsearch)
print("Starting log query script. Press Ctrl+C to exit.")
# Run the scheduler
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("Script terminated by user.")
```
## DDOS Alert v1
```py
import schedule
import time
import requests
from datetime import datetime, timedelta
ELASTICSEARCH_URL = "http://10.1.1.20:9200"
TIME_FIELD = "@timestamp"
HEADERS = {"Content-Type": "application/json"}
history = {}
counter = {}
rate = 2 # 一個倍率,根據實際狀況進行調整
def send_alert(log):
print("ALERT: Detected DDOS", log)
# 回傳過去一小時的訪問次數
def get_logs_count_last_hour():
now = datetime.utcnow()
one_hour_ago = now - timedelta(hours=1)
query = {
"query": {
"bool": {
"must": [
{
"range": {
TIME_FIELD: {
"gte": one_hour_ago.isoformat(),
"lte": now.isoformat(),
"format": "strict_date_optional_time"
}
}
}
]
}
}
} # Elasticsearch 的查詢語法
response = requests.post(f"{ELASTICSEARCH_URL}/logstash-*/_count", headers=HEADERS, json=query)
if response.status_code != 200:
print(f"Failed to query logs count at {now.isoformat()}: {response.json()}")
return
count = response.json().get('count', 0)
return count
def job():
"""
取得過去一小時的訪問次數,然後判斷是否超過該時間段的平均值,
如果超過,就發送警告。
以及將這筆資料記錄下來
"""
c = get_logs_count_last_hour()
h = datetime.now().hour
s = history[h] // counter[c]
if c >= s * rate:
send_alert(c // s)
history[h] += c
counter[h] += 1
# Schedule job to run at the top of every hour
schedule.every().hour.at(":00").do(job)
if __name__ == '__main__':
while True:
schedule.run_pending()
time.sleep(1)
```
## Line Bot
先申請一個 line bot:https://ithelp.ithome.com.tw/m/articles/10215268。
```python
import json
import requests
from flask import Flask, request, jsonify
ChannelAccessToken = '' # Place your access token here.
USER_IDS_FILE = 'user_ids.txt'
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
return 'Hello, World!', 200
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.get_json()
for event in data['events']:
userId = event['source']['userId']
userMessage = event['message']['text']
if userMessage == 'abcd1234': # Place a password here.
print(f"Received special message from user {userId}: {userMessage}")
with open(USER_IDS_FILE, 'a') as f:
f.write(f"{userId}\n")
print(f"Recorded userId: {userId}")
return 'OK', 200
@app.route('/send_alert', methods=['POST'])
def send_alert():
data = request.get_json()
if 'message' not in data:
return jsonify({"error": "Missing 'message' in request body"}), 400
alert_message = data['message']
try:
with open(USER_IDS_FILE, 'r') as f:
user_ids = f.readlines()
user_ids = [uid.strip() for uid in user_ids if uid.strip()]
if not user_ids:
return jsonify({"error": "No user IDs found"}), 404
for userId in user_ids:
payload = {
"to": userId,
"messages": [
{
"type": "text",
"text": alert_message
}
]
}
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {ChannelAccessToken}'
}
response = requests.post('https://api.line.me/v2/bot/message/push', headers=headers, data=json.dumps(payload))
if response.status_code != 200:
print(f"Failed to send message to {userId}: {response.text}")
return jsonify({"status": "Alert sent to all users"}), 200
except FileNotFoundError:
return jsonify({"error": "User ID file not found"}), 404
if __name__ == '__main__':
app.run(host='0.0.0.0', port=80, debug=True)
```
使用者發送密碼給 line bot,bot 就會記錄使用者 id,當 alert 發送時,會發送給所有被記錄的使用者。
```
import requests
import json
def send_alert(message):
api_url = "http://your-server-ip/send_alert"
payload = {
"message": message
}
headers = {
"Content-Type": "application/json"
}
try:
response = requests.post(api_url, headers=headers, data=json.dumps(payload))
print(f"Status Code: {response.status_code}")
print(f"Response: {response.text}")
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
```
## Alert v2
```python3=
import requests
import json
import schedule
import time
from datetime import datetime
def send_alert(log):
print("ALERT: Detected 500 error log!")
print(json.dumps(log, indent=4))
def query_elasticsearch():
url = "http://10.1.1.20:9200/logstash-*/_search?pretty"
headers = {
'Content-Type': 'application/json'
}
query = {
"query": {
"range": {
"@timestamp": {
"gte": "now-30s"
}
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
response = requests.get(url, headers=headers, data=json.dumps(query))
if response.status_code == 200:
results = response.json()
hits = results.get('hits', {}).get('hits', [])
if hits:
print(f"Logs from the last minute ({datetime.now()}):")
for hit in hits:
log = hit["_source"]
send_alert(log)
else:
print(f"No logs found in the last minute ({datetime.now()}).")
else:
print(f"Failed to fetch logs: {response.status_code} - {response.text}")
# Schedule the task to run every minute
schedule.every(30).seconds.do(query_elasticsearch)
print("Starting log query script. Press Ctrl+C to exit.")
# Run the scheduler
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("Script terminated by user.")
```
---
### 設定 Kibana Rules
點擊左邊 Menu:

選擇 Stack Management:

選擇 Rules and Insights:

Create Rule:

選擇下面的 Elasticsearch Query:

就可以編輯 query 語法了:

具體可以參考 [Doc](https://www.elastic.co/guide/en/kibana/current/rule-type-es-query.html)。