# ELK note
###### tags: `work`
> related: ElasticSearch, Logstash, Kibana, Filebeat, APM
## Docker images
> [Running the Elastic Stack ("ELK") on Docker](https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-stack-docker.html)
> [Running Logstash on Docker](https://www.elastic.co/guide/en/logstash/current/docker.html)
### OSS version
[ElasticSearch](https://www.docker.elastic.co/r/elasticsearch/elasticsearch-oss)
[Logstash](https://www.docker.elastic.co/r/logstash/logstash-oss)
[Kibana](https://www.docker.elastic.co/r/kibana/kibana-oss)
[Filebeat](https://www.docker.elastic.co/r/beats/filebeat-oss)
[APM](https://www.docker.elastic.co/r/apm/apm-server-oss)
### docker compose file
:::spoiler show
```yaml=
version: "3.7"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2
container_name: elasticsearch
hostname: elasticsearch
volumes:
- esdata:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
environment:
- discovery.type=single-node
kibana:
image: docker.elastic.co/kibana/kibana-oss:7.10.2
container_name: kibana
hostname: kibana
ports:
- "5601:5601"
environment:
SERVERNAME: kibana
ELASTICSEARCH_URL: http://elasticsearch:9200
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
logstash:
image: docker.elastic.co/logstash/logstash-oss:7.12.0
container_name: logstash
links:
- elasticsearch
volumes:
- /Users/kent_chen/Storage/docker-volume/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
ports:
- "5044:5044"
filebeat:
image: docker.elastic.co/beats/filebeat-oss:7.12.0
container_name: filebeat
links:
- logstash
volumes:
- ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
- /Users/kent_chen/Storage/docker-volume/logstash/log:/usr/share/filebeat/log:ro
volumes:
esdata:
```
:::
### logstash.conf
:::spoiler show
```=
input {
beats {
port => 5044
}
tcp {
port => 5000
}
}
filter {
grok {
match => {
"message" => "(?<timestamp>.*)\|(?<level>.*)\|(?<thread>.*)\|(?<class>.*)\|(?<bundle>.*)\|(?<msg>.*)"
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
```
:::
### filebeat.yml
:::spoiler show
```yaml=
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/share/filebeat/log/gc-*.log
- /usr/share/filebeat/log/karaf*.log
output.logstash:
hosts: ["logstash:5044"]
```
:::
---
## Logstash
### File input
> [ref](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html)
#### Read old files
If the file you want to tail read has some existing contents, set `start_position` as `"beginning"`.
```
start_position => "beginning"
```
There's a file path called `sincedb_path` that stores the reading status of files.
By default, `start_position` is `"end"`, which make Logstash read the un-read file (no related data at sincedb) start at the end, like `tail -f` this file, and if `start_position` is `"beginning"`, Logstash will read this file from the beginning, and after completely read, it will have the same behavior as `"end"`.
Take a 10 line file (never be read by Logstash) for example, `"end"` will start reading from line 11, and `"beginning"` will start reading from line 1.
### grok filter
> [ref](https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html)
if grok failed, the message will have a `tags`: `_grokparsefailure`.
:::success
Therefore, if you don't want to send those messages that are failed to be parsed, simply add a condition at output.
```
output {
if "_grokparsefailure" not in [tags] {
elasticsearch {...}
}
}
```
:::
grok filter use regular expression to match each message, and use a pattern `(?<{field_name}>{regex})` for saving the matched string as a field.
e.g.,
```=
filter {
grok {
match => {
"message" => "(?<timestamp>.*)\|(?<msg>.*)"
}
}
}
```
There's a useful tool for debugging grok: [Grok Debugger](https://grokdebug.herokuapp.com/).
#### Built-in grok pattern
> [list](https://github.com/logstash-plugins/logstash-patterns-core/tree/main/patterns)
Use `%{<pattern>:<field>}`. e.g., `%{INT:count}`.
### Date filter
> [ref](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html)
The date filter is used for parsing dates from fields, and then using that date or timestamp as the logstash timestamp for the event.
:::warning
If the date matching failed and you can't recognize what's wrong, maybe your timestamp string has a whitespace at the end.
:::
* Sample
```=
filter {
grok {
match => {
"message" => "(?<timestamp>.*)\|(?<level>.*)\|(?<thread>.*)\|(?<class>.*)\|(?<bundle>.*)\|(?<msg>.*)"
}
}
date {
match => [ "timestamp", "ISO8601" ]
remove_field => ["timestamp"]
timezone => "Asia/Taipei"
target => "@timestamp"
}
}
```
* [match](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match)
* `[<field>, <pattern>...]`
* Multiple patterns supported.
* Support `ISO8601`, `UNIX`, `UNIX_MS`, `TAI64N`
* [remove_field](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-remove_field)
* remove these field if the timestamp is matched.
* [target](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-target)
* Store the matching timestamp into the given target field. Default: `@timestamp`
* [timezone](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-timezone)
* Represent the time zone of the timestamp. Default is using UTC+0.
* [Supported time zone](http://joda-time.sourceforge.net/timezones.html)
### Fingerprint Filter
> [ref](https://www.elastic.co/guide/en/logstash/current/plugins-filters-fingerprint.html)
Fingerprint filter creates consistent hashes with one or more fields.
e.g.,
```
fingerprint {
concatenate_sources => true
source => ["timestamp", "msg"]
method => "MD5"
}
```
This example shows that fingerprint filter use `timestamp` and `msg` to create a hash with MD5 algorithm, and if there's another event has same values at field `timestamp` and `msg`, this hash will be the same.
Therefore, you can use a fingerprint to prevent from unexpected duplicate events being sent to Elasticsearch by simply making a fingerprint a document ID. Document with the same document ID will update an existing document instead of create a new one.
```
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "karaf-%{+YYYY.MM.dd}"
document_id => "%{fingerprint}"
}
}
```
### input gzip
> [ref](https://github.com/elastic/logstash/issues/10194)
The gzip file only be supported when the mode is `read` (default is `tail`), which means Logstash won't check the changing of those read files.
```=
input {
file {
path => ["/usr/share/logstash/logs/karaf*.log.gz"]
start_position => "beginning"
type => "karaf"
mode => "read"
}
}
```
### multiple input, filter, output
The config file `logstash.conf` supports if-else statement. The `type` of input is recommended to be used for these condition, and of course you can use `path` as condition.
e.g.,
```=
input {
file {
path => ["/path/to/first/sys-*.log"]
type => "system"
}
file {
path => ["/path/to/second/data-*.log"]
type => "data"
}
}
filter {
if [type] == "system" {
grok {
match => {...}
}
} else if [type] == "data" {
grok {
match => {...}
}
}
}
output {
if [type] == "system" {
elasticsearch {...}
} else if [type] == "data" {
elasticsearch {...}
}
}
```
### Log with multiline
> [Multiline codec plugin](https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html)
With multiline codec plugin, you can merge multiple lines into a single event.
e.g.,
```=
input {
file {
path => ["/usr/share/logstash/proj_log/sys*.log"]
codec => multiline {
pattern => "^\D"
what => "previous"
}
}
}
```
* `pattern`: Any line matches this pattern will be considered as a part of multi-line event.
* `what`: must be `"previous"` or `"next"`, indicates the relation to the multi-line event.
Take this setting for example. If there's a log:
```=
2022-03-05T13:25:12.121 AAAAAA
2022-03-05T13:25:12.542 BBBBBB
CCCCCC
DDDDDD
2022-03-05T13:25:12.542 EEEEEE
```
Line 2~4 will be considered as a single event.
If the `what` is set to `next`, line 3~5 will be a single event.
### filter success & failure
There are several settings representing the behavior when the filter is a success or failure.
For example, `tag_on_failure` will add the tags to those events that failed to filter, and `add_tag` add the tags when filter is succeed.
One of the most common uses is using `remove_field` at grok filter to remove the field `message` which stores the raw data of an event.
### logstash.conf auto update
> [ref](https://www.elastic.co/guide/en/logstash/current/reloading-config.html)
By default, you have to restart Logstash to reload `logstash.conf`.
---
## ElasticSearch
### Monitoring ElasticSearch JVM
```
http://<es-ip>:9200/_nodes/stats/jvm
```
### Use Snapshot for exporting and importing
> [ref](https://kifarunix.com/restore-elasticsearch-snapshot-to-another-cluster/)
> [Elastic Discuss](https://discuss.elastic.co/t/proper-way-to-dump-indices-from-elasticsearch-and-import-to-another-elasticsearch-instance/310938)
#### Export
1. Set up ElasticSearch and docker compose file
1. add `path.repo` to `<ES_HOME>/config/elasticcsearch.yml`. e.g.,
```yaml
path.repo: "/usr/share/elasticsearch/backup"
```
2. Mount elasticsearch.yml with `docker-compose.yml`
```yaml
volumes:
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
```
4. (Optional) mount backup folder and elasticcsearch.yml with `docker-compose.yml`
```yaml
volumes:
- esbackup:/usr/share/elasticsearch/backup
```
2. Register repository with [Create or update snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/put-snapshot-repo-api.html)
```
PUT http://<ES_HOST>:9200/_snapshot/<snapshot repository name>
```
with body:
```json
{
"type": "fs",
"settings": {
"location": "/usr/share/elasticsearch/backup"
}
}
```
:::danger
If creating a snapshot repository failed, check the ownership of user and group of the snapshot repository folder (with `ls -al`).
Use `chown <user>:<group> <file/folder>` to modify the ownership of the folder.
:::
3. Create snapshot with [Create snapshot API](https://www.elastic.co/guide/en/elasticsearch/reference/current/create-snapshot-api.html)
```
PUT http://<ES_HOST>:9200/_snapshot/<snapshot repository name>/<snapshot name>
```
You can specify what index is to be exported with request body
```json
{
"indices": ["index1", "index2"]
}
```
4. Unregister snapshot repository with [Delete snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-snapshot-repo-api.html)
```
DELETE http://<ES_HOST>:9200/_snapshot/<snapshot repository name>
```
5. Copy all files inside the snapshot repository with `docker cp` to the local path. That's all!
#### Import
1. If the repository is registered, unregister it with [Delete snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-snapshot-repo-api.html)
```
DELETE http://<ES_HOST>:9200/_snapshot/<snapshot repository name>
```
2. Delete all files inside the snapshot repository folder (such as using `docker exec`), and copy all exporting files into this folder (with `docker cp`).
:::danger
Copied files might be with the wrong user and group ownership, make sure they have the right one.
* check with `ls -al`
* recursively modify the ownership of the snapshot repository folder
```
chown -R /usr/share/elasticsearch/backup
```
:::
3. Register repository with [Create or update snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/put-snapshot-repo-api.html)
```
PUT http://<ES_HOST>:9200/_snapshot/<snapshot repository name>
```
with body:
```json
{
"type": "fs",
"settings": {
"location": "/usr/share/elasticsearch/backup"
}
}
```
4. Use [Get snapshot API](https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-api.html) to check if the snapshot is loaded.
```
GET http://<ES_HOST>:9200/_snapshot/<snapshot repository name>/<snapshot name>
```
5. Use [Restore snapshot API](https://www.elastic.co/guide/en/elasticsearch/reference/current/restore-snapshot-api.html) to restore (import).
```
POST http://<ES_HOST>:9200/_snapshot/<snapshot repository name>/<snapshot name>/_restore
```
You can specify what index is to be exported with request body
```json
{
"indices": ["index1", "index2"]
}
```
### Use Search/Bulk REST API for exporting and importing
> This is a recommended method if you want to export the index as a custom or readable format.
:::warning
Caution: unfinished section
:::
#### Export
Use [Search API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html)
or [Sliced scroll](https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#slice-scroll)
```
GET /<target>/_search
```
* Path parameter
* `target`: Target index, support wildcard (`*`).
* Query parameters
* `from`: Starting document offset. Default to `0`.
* `size`: Number of hits return. Default to `10`, maximum is `10000`.
#### Import
Use [Document API/Index API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html)
```
POST /<target>/_doc
```
* Body
```json
{
"@timestamp": "2022-05-04T12:31:05.113",
"field1": "value1",
"field2": "value2"
}
```
Or use [Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) for multiple importing.
> [Other tutorial](https://kucw.github.io/blog/2018/7/elasticsearch-bulk/)
### Use elasticsearch-dump for index exporting and importing
:::warning
This is a third-party tool, it can help you export the index as serveral formats.
Requires Node.js runtime environment.
:::
Use [elasticsearch-dump](https://github.com/elasticsearch-dump/elasticsearch-dump) to dump index.
`git clone` the repository:
```shell
git clone https://github.com/elasticsearch-dump/elasticsearch-dump.git
```
install with npm:
```shell
cd elasticsearch-dump
npm install elasticdump
```
#### Dump index
:::danger
File path must be absolute path.
:::
```shell
./bin/elasticdump \
--input=http://<elasticsearch IP>:<port>/<index name> \
--output=<file path>/<file name>.json
```
#### Import index
:::danger
File path must be absolute path.
:::
```shell
./bin/elasticdump \
--input=<file path>/<file name>.json \
--output=http://<elasticsearch IP>:<port>/<index name>
```
---
## Kibana
### Dev Tools
> Dev Tools can help you manage the data stored in ElasticSearch.
* delete index (support wildcard):
```
DELETE <index>
```
---
## Filebeat
> [Filebeat quick start: installation and configuration](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html)
### Add custom field
> [Add tags](https://www.elastic.co/guide/en/beats/filebeat/current/add-tags.html)
`tags` is a list which will be the value of the field `target`.
`target` is optioanl, default is `tags`.
```yaml=
processors:
- add_tags:
tags: ["custom-tag"]
target: "custom-field"
```
---
## APM (Application Performance Management)
OSS version Kibana doesn't support APM Dashboard
### Java Agent
[apm-agent-java (GitHub)](https://github.com/elastic/apm-agent-java)
Official testing web app: [opbeans](https://github.com/elastic/opbeans-java)
### Run Karaf with Java agent
> https://www.elastic.co/guide/en/apm/guide/7.17/apm-quick-start.html#add-apm-integration-agents
Set `KARAF_OPTS` as:
```shell
export KARAF_OPTS="-javaagent:<java agent path> -Delastic.apm.service_name=<service name> -Delastic.apm.server_urls=<APM server url>"
```
* `-javaagent`: Path of Java agent, should be a .jar file.
* `-Delastic.apm.service_name`: At Kibana, it will be `service.name` field.
* `-Delastic.apm.server_urls`: URL of APM server.
and then run karaf.
### APM filed
> [ref](https://www.elastic.co/guide/en/apm/guide/current/exported-fields.html)
* `transaction.name`:
Generic designation of a transaction in the scope of a single service.
> Seems like it's the method being used?
### Tuning Agent
https://www.elastic.co/guide/en/apm/agent/java/master/tuning-and-overhead.html
---
## Use Case: Collect Logs from Remote VM
> The server at "center" will collect the logs from itself and from the remote machines. That is, the `input` of Logstash will contains `beats` and `file`.

### Center
> The logs from "center" will send to Logstash directly.
1. Prepare docker images
* [docker.elastic.co/kibana/kibana-oss:7.10.2](https://www.docker.elastic.co/r/kibana/kibana-oss)
* [docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2](https://www.docker.elastic.co/r/elasticsearch/elasticsearch-oss)
* [docker.elastic.co/logstash/logstash-oss:7.12.0](https://www.docker.elastic.co/r/logstash/logstash-oss)
* [docker.elastic.co/apm/apm-server-oss:7.12.0](https://www.docker.elastic.co/r/apm/apm-server-oss)
2. Prepare a docker compose file
3. Prepare logstash.conf
* `input`: `beats`, `file`
* Prepare regular expression of message
4. Prepare apm-server.docker.yml
5. Prepare Java agent (for APM)
6. (optional) Prepare a script for fast deployment
### Remote machine
> These remote machines will send logs to Logstash of "Center" with Filebeat.
1. Prepare docker images
* [docker.elastic.co/beats/filebeat-oss:7.12.0](https://www.docker.elastic.co/r/beats/filebeat-oss)
2. Prepare a docker compose file or a `docker run` command
3. Prepare filebeat.yml
5. (optional) Prepare a script for fast deployment
### Sample: logstash.conf at "Center"
Let's make it more complicated, assuming that "Center" runs a Apache Karaf service which:
* Write Karaf log at runtime
* Write GC (Garbage Collection) log at runtime
* Has some existing Karaf logs zipped as gzip files (.gz)
* Has some existing GC logs
* Will receive logs sent by remote Filebeat
* Run at IP 192.168.12.34
Advanced requirements:
* If grok parsing failed, don't send to ElasticSearch.
* Value of the field should start and end with a non-whitespace character (`\S`).
* regex sample for this requirement: `\s*(?<field>\S.+\S)\s*`
Karaf log sample:
```
2022-06-14T15:22:02,009 | DEBUG | ELK-Test-Thread | ELK-Test-Remote-logger | 999 - com.example.elk.test - 2.5.0.M18 | random message
```
GC log sample:
```
[2022-06-02T09:31:24.655-0800][0.074s][info ][gc ] Using G1
```
Remote log sample (use an unusual timestamp format):
```
2022-06-14 15:22:02.0192||DEBUG| ELK-Test-Remote-logger || random message
```
:::warning
Note that this timestamp doesn't match ISO 8601, so we have to define a custom format.
Besides, since maximum precision is milliseconds (`SSS`), this timestamp should not include the last digit (`2` of `.0192`). We can use regular expression or [ruby filter](https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html) to deal with it.
:::
logstash.conf sample:
```=
input {
file {
path => ["/usr/share/logstash/centerlog/karaf*.log"]
start_position => "beginning"
type => "karaf"
mode => "tail"
}
file {
path => ["/usr/share/logstash/centerlog/karaf*.log.gz"]
start_position => "beginning"
type => "karaf"
mode => "read"
}
file {
path => ["/usr/share/logstash/centerlog/gc*.log"]
start_position => "beginning"
type => "gc"
mode => "tail"
}
beats {
port => 5044
type => "remote"
}
}
filter {
if [type] == "karaf" {
grok {
match => {
"message" => "(?<timestamp>\S.+\S)\s*\|\s*(?<level>\S.+\S)\s*\|\s*(?<thread>\S.+\S)\s*\|\s*(?<logger>\S.+\S)\s*\|\s*(?<bundle>\S.+\S)\s*\|\s*(?<msg>\S.+\S)\s*"
}
}
} else if [type] == "gc" {
grok {
match => {
"message" => "\[\s*(?<timestamp>\S.+\S)\s*\]\[\s*(?<time>\S.+\S)\s*\]\[\s*(?<level>\S.+\S)\s*\]\[\s*(?<label>\S.+\S)\s*\]\s*(?<msg>\S.+\S)\s*"
}
}
} else if [type] == "remote" {
grok {
match => {
"message" => "(?<timestamp>\S.+\S)\S\s*\|\|\s*(?<level>\S.+\S)\s*\|\s*(?<logger>\S.+\S)\s*\|\|\s*(?<msg>\S.+\S)\s*"
}
}
}
date {
match => [ "timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss.SSS" ]
remove_field => ["timestamp"]
timezone => "Asia/Taipei"
}
}
output {
if "_grokparsefailure" not in [tags] {
if [type] == "karaf" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "karaf-%{+YYYY.MM.dd}"
}
} else if [type] == "gc" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "gc-%{+YYYY.MM.dd}"
}
} else if [type] == "remote" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "remote-%{+YYYY.MM.dd}"
}
}
}
}
```
### Sample: filebeat.yml at "Remote"
```yaml=
filebeat.inputs:
- type: log
enabled: true
path:
- /usr/share/filebeat/remotelog/remote*.log
output.logstash:
hosts: ["192.168.12.34:5044"]
```
---
## Notes
```graphviz
digraph parent{
rankdir=LR;
st[shape=rect label="other input types"]
file[shape=square]
fb[label=Filebeat]
mb[label=Metricbeat]
ls[label=Logstash]
otheropt[shape=rect label="other output types"]
es[label=ElasticSearch]
k[label=Kibana]
apm[label=APM]
file->fb
fb->ls[label=input]
st->ls[label=input]
file->ls[label=input]
ls->otheropt[label=output]
mb->es
apm->es[label="Java agent"]
ls->es[label=output]
es->k
}
```