# ELK note ###### tags: `work` > related: ElasticSearch, Logstash, Kibana, Filebeat, APM ## Docker images > [Running the Elastic Stack ("ELK") on Docker](https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-stack-docker.html) > [Running Logstash on Docker](https://www.elastic.co/guide/en/logstash/current/docker.html) ### OSS version [ElasticSearch](https://www.docker.elastic.co/r/elasticsearch/elasticsearch-oss) [Logstash](https://www.docker.elastic.co/r/logstash/logstash-oss) [Kibana](https://www.docker.elastic.co/r/kibana/kibana-oss) [Filebeat](https://www.docker.elastic.co/r/beats/filebeat-oss) [APM](https://www.docker.elastic.co/r/apm/apm-server-oss) ### docker compose file :::spoiler show ```yaml= version: "3.7" services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 container_name: elasticsearch hostname: elasticsearch volumes: - esdata:/usr/share/elasticsearch/data ports: - "9200:9200" - "9300:9300" environment: - discovery.type=single-node kibana: image: docker.elastic.co/kibana/kibana-oss:7.10.2 container_name: kibana hostname: kibana ports: - "5601:5601" environment: SERVERNAME: kibana ELASTICSEARCH_URL: http://elasticsearch:9200 ELASTICSEARCH_HOSTS: http://elasticsearch:9200 logstash: image: docker.elastic.co/logstash/logstash-oss:7.12.0 container_name: logstash links: - elasticsearch volumes: - /Users/kent_chen/Storage/docker-volume/logstash/pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf ports: - "5044:5044" filebeat: image: docker.elastic.co/beats/filebeat-oss:7.12.0 container_name: filebeat links: - logstash volumes: - ./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro - /Users/kent_chen/Storage/docker-volume/logstash/log:/usr/share/filebeat/log:ro volumes: esdata: ``` ::: ### logstash.conf :::spoiler show ```= input { beats { port => 5044 } tcp { port => 5000 } } filter { grok { match => { "message" => "(?<timestamp>.*)\|(?<level>.*)\|(?<thread>.*)\|(?<class>.*)\|(?<bundle>.*)\|(?<msg>.*)" } } } output { elasticsearch { hosts => ["elasticsearch:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } } ``` ::: ### filebeat.yml :::spoiler show ```yaml= filebeat.inputs: - type: log enabled: true paths: - /usr/share/filebeat/log/gc-*.log - /usr/share/filebeat/log/karaf*.log output.logstash: hosts: ["logstash:5044"] ``` ::: --- ## Logstash ### File input > [ref](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html) #### Read old files If the file you want to tail read has some existing contents, set `start_position` as `"beginning"`. ``` start_position => "beginning" ``` There's a file path called `sincedb_path` that stores the reading status of files. By default, `start_position` is `"end"`, which make Logstash read the un-read file (no related data at sincedb) start at the end, like `tail -f` this file, and if `start_position` is `"beginning"`, Logstash will read this file from the beginning, and after completely read, it will have the same behavior as `"end"`. Take a 10 line file (never be read by Logstash) for example, `"end"` will start reading from line 11, and `"beginning"` will start reading from line 1. ### grok filter > [ref](https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html) if grok failed, the message will have a `tags`: `_grokparsefailure`. :::success Therefore, if you don't want to send those messages that are failed to be parsed, simply add a condition at output. ``` output { if "_grokparsefailure" not in [tags] { elasticsearch {...} } } ``` ::: grok filter use regular expression to match each message, and use a pattern `(?<{field_name}>{regex})` for saving the matched string as a field. e.g., ```= filter { grok { match => { "message" => "(?<timestamp>.*)\|(?<msg>.*)" } } } ``` There's a useful tool for debugging grok: [Grok Debugger](https://grokdebug.herokuapp.com/). #### Built-in grok pattern > [list](https://github.com/logstash-plugins/logstash-patterns-core/tree/main/patterns) Use `%{<pattern>:<field>}`. e.g., `%{INT:count}`. ### Date filter > [ref](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html) The date filter is used for parsing dates from fields, and then using that date or timestamp as the logstash timestamp for the event. :::warning If the date matching failed and you can't recognize what's wrong, maybe your timestamp string has a whitespace at the end. ::: * Sample ```= filter { grok { match => { "message" => "(?<timestamp>.*)\|(?<level>.*)\|(?<thread>.*)\|(?<class>.*)\|(?<bundle>.*)\|(?<msg>.*)" } } date { match => [ "timestamp", "ISO8601" ] remove_field => ["timestamp"] timezone => "Asia/Taipei" target => "@timestamp" } } ``` * [match](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match) * `[<field>, <pattern>...]` * Multiple patterns supported. * Support `ISO8601`, `UNIX`, `UNIX_MS`, `TAI64N` * [remove_field](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-remove_field) * remove these field if the timestamp is matched. * [target](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-target) * Store the matching timestamp into the given target field. Default: `@timestamp` * [timezone](https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-timezone) * Represent the time zone of the timestamp. Default is using UTC+0. * [Supported time zone](http://joda-time.sourceforge.net/timezones.html) ### Fingerprint Filter > [ref](https://www.elastic.co/guide/en/logstash/current/plugins-filters-fingerprint.html) Fingerprint filter creates consistent hashes with one or more fields. e.g., ``` fingerprint { concatenate_sources => true source => ["timestamp", "msg"] method => "MD5" } ``` This example shows that fingerprint filter use `timestamp` and `msg` to create a hash with MD5 algorithm, and if there's another event has same values at field `timestamp` and `msg`, this hash will be the same. Therefore, you can use a fingerprint to prevent from unexpected duplicate events being sent to Elasticsearch by simply making a fingerprint a document ID. Document with the same document ID will update an existing document instead of create a new one. ``` output { elasticsearch { hosts => ["elasticsearch:9200"] index => "karaf-%{+YYYY.MM.dd}" document_id => "%{fingerprint}" } } ``` ### input gzip > [ref](https://github.com/elastic/logstash/issues/10194) The gzip file only be supported when the mode is `read` (default is `tail`), which means Logstash won't check the changing of those read files. ```= input { file { path => ["/usr/share/logstash/logs/karaf*.log.gz"] start_position => "beginning" type => "karaf" mode => "read" } } ``` ### multiple input, filter, output The config file `logstash.conf` supports if-else statement. The `type` of input is recommended to be used for these condition, and of course you can use `path` as condition. e.g., ```= input { file { path => ["/path/to/first/sys-*.log"] type => "system" } file { path => ["/path/to/second/data-*.log"] type => "data" } } filter { if [type] == "system" { grok { match => {...} } } else if [type] == "data" { grok { match => {...} } } } output { if [type] == "system" { elasticsearch {...} } else if [type] == "data" { elasticsearch {...} } } ``` ### Log with multiline > [Multiline codec plugin](https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html) With multiline codec plugin, you can merge multiple lines into a single event. e.g., ```= input { file { path => ["/usr/share/logstash/proj_log/sys*.log"] codec => multiline { pattern => "^\D" what => "previous" } } } ``` * `pattern`: Any line matches this pattern will be considered as a part of multi-line event. * `what`: must be `"previous"` or `"next"`, indicates the relation to the multi-line event. Take this setting for example. If there's a log: ```= 2022-03-05T13:25:12.121 AAAAAA 2022-03-05T13:25:12.542 BBBBBB CCCCCC DDDDDD 2022-03-05T13:25:12.542 EEEEEE ``` Line 2~4 will be considered as a single event. If the `what` is set to `next`, line 3~5 will be a single event. ### filter success & failure There are several settings representing the behavior when the filter is a success or failure. For example, `tag_on_failure` will add the tags to those events that failed to filter, and `add_tag` add the tags when filter is succeed. One of the most common uses is using `remove_field` at grok filter to remove the field `message` which stores the raw data of an event. ### logstash.conf auto update > [ref](https://www.elastic.co/guide/en/logstash/current/reloading-config.html) By default, you have to restart Logstash to reload `logstash.conf`. --- ## ElasticSearch ### Monitoring ElasticSearch JVM ``` http://<es-ip>:9200/_nodes/stats/jvm ``` ### Use Snapshot for exporting and importing > [ref](https://kifarunix.com/restore-elasticsearch-snapshot-to-another-cluster/) > [Elastic Discuss](https://discuss.elastic.co/t/proper-way-to-dump-indices-from-elasticsearch-and-import-to-another-elasticsearch-instance/310938) #### Export 1. Set up ElasticSearch and docker compose file 1. add `path.repo` to `<ES_HOME>/config/elasticcsearch.yml`. e.g., ```yaml path.repo: "/usr/share/elasticsearch/backup" ``` 2. Mount elasticsearch.yml with `docker-compose.yml` ```yaml volumes: - ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml ``` 4. (Optional) mount backup folder and elasticcsearch.yml with `docker-compose.yml` ```yaml volumes: - esbackup:/usr/share/elasticsearch/backup ``` 2. Register repository with [Create or update snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/put-snapshot-repo-api.html) ``` PUT http://<ES_HOST>:9200/_snapshot/<snapshot repository name> ``` with body: ```json { "type": "fs", "settings": { "location": "/usr/share/elasticsearch/backup" } } ``` :::danger If creating a snapshot repository failed, check the ownership of user and group of the snapshot repository folder (with `ls -al`). Use `chown <user>:<group> <file/folder>` to modify the ownership of the folder. ::: 3. Create snapshot with [Create snapshot API](https://www.elastic.co/guide/en/elasticsearch/reference/current/create-snapshot-api.html) ``` PUT http://<ES_HOST>:9200/_snapshot/<snapshot repository name>/<snapshot name> ``` You can specify what index is to be exported with request body ```json { "indices": ["index1", "index2"] } ``` 4. Unregister snapshot repository with [Delete snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-snapshot-repo-api.html) ``` DELETE http://<ES_HOST>:9200/_snapshot/<snapshot repository name> ``` 5. Copy all files inside the snapshot repository with `docker cp` to the local path. That's all! #### Import 1. If the repository is registered, unregister it with [Delete snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-snapshot-repo-api.html) ``` DELETE http://<ES_HOST>:9200/_snapshot/<snapshot repository name> ``` 2. Delete all files inside the snapshot repository folder (such as using `docker exec`), and copy all exporting files into this folder (with `docker cp`). :::danger Copied files might be with the wrong user and group ownership, make sure they have the right one. * check with `ls -al` * recursively modify the ownership of the snapshot repository folder ``` chown -R /usr/share/elasticsearch/backup ``` ::: 3. Register repository with [Create or update snapshot repository API](https://www.elastic.co/guide/en/elasticsearch/reference/current/put-snapshot-repo-api.html) ``` PUT http://<ES_HOST>:9200/_snapshot/<snapshot repository name> ``` with body: ```json { "type": "fs", "settings": { "location": "/usr/share/elasticsearch/backup" } } ``` 4. Use [Get snapshot API](https://www.elastic.co/guide/en/elasticsearch/reference/current/get-snapshot-api.html) to check if the snapshot is loaded. ``` GET http://<ES_HOST>:9200/_snapshot/<snapshot repository name>/<snapshot name> ``` 5. Use [Restore snapshot API](https://www.elastic.co/guide/en/elasticsearch/reference/current/restore-snapshot-api.html) to restore (import). ``` POST http://<ES_HOST>:9200/_snapshot/<snapshot repository name>/<snapshot name>/_restore ``` You can specify what index is to be exported with request body ```json { "indices": ["index1", "index2"] } ``` ### Use Search/Bulk REST API for exporting and importing > This is a recommended method if you want to export the index as a custom or readable format. :::warning Caution: unfinished section ::: #### Export Use [Search API](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-search.html) or [Sliced scroll](https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#slice-scroll) ``` GET /<target>/_search ``` * Path parameter * `target`: Target index, support wildcard (`*`). * Query parameters * `from`: Starting document offset. Default to `0`. * `size`: Number of hits return. Default to `10`, maximum is `10000`. #### Import Use [Document API/Index API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html) ``` POST /<target>/_doc ``` * Body ```json { "@timestamp": "2022-05-04T12:31:05.113", "field1": "value1", "field2": "value2" } ``` Or use [Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html) for multiple importing. > [Other tutorial](https://kucw.github.io/blog/2018/7/elasticsearch-bulk/) ### Use elasticsearch-dump for index exporting and importing :::warning This is a third-party tool, it can help you export the index as serveral formats. Requires Node.js runtime environment. ::: Use [elasticsearch-dump](https://github.com/elasticsearch-dump/elasticsearch-dump) to dump index. `git clone` the repository: ```shell git clone https://github.com/elasticsearch-dump/elasticsearch-dump.git ``` install with npm: ```shell cd elasticsearch-dump npm install elasticdump ``` #### Dump index :::danger File path must be absolute path. ::: ```shell ./bin/elasticdump \ --input=http://<elasticsearch IP>:<port>/<index name> \ --output=<file path>/<file name>.json ``` #### Import index :::danger File path must be absolute path. ::: ```shell ./bin/elasticdump \ --input=<file path>/<file name>.json \ --output=http://<elasticsearch IP>:<port>/<index name> ``` --- ## Kibana ### Dev Tools > Dev Tools can help you manage the data stored in ElasticSearch. * delete index (support wildcard): ``` DELETE <index> ``` --- ## Filebeat > [Filebeat quick start: installation and configuration](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html) ### Add custom field > [Add tags](https://www.elastic.co/guide/en/beats/filebeat/current/add-tags.html) `tags` is a list which will be the value of the field `target`. `target` is optioanl, default is `tags`. ```yaml= processors: - add_tags: tags: ["custom-tag"] target: "custom-field" ``` --- ## APM (Application Performance Management) OSS version Kibana doesn't support APM Dashboard ### Java Agent [apm-agent-java (GitHub)](https://github.com/elastic/apm-agent-java) Official testing web app: [opbeans](https://github.com/elastic/opbeans-java) ### Run Karaf with Java agent > https://www.elastic.co/guide/en/apm/guide/7.17/apm-quick-start.html#add-apm-integration-agents Set `KARAF_OPTS` as: ```shell export KARAF_OPTS="-javaagent:<java agent path> -Delastic.apm.service_name=<service name> -Delastic.apm.server_urls=<APM server url>" ``` * `-javaagent`: Path of Java agent, should be a .jar file. * `-Delastic.apm.service_name`: At Kibana, it will be `service.name` field. * `-Delastic.apm.server_urls`: URL of APM server. and then run karaf. ### APM filed > [ref](https://www.elastic.co/guide/en/apm/guide/current/exported-fields.html) * `transaction.name`: Generic designation of a transaction in the scope of a single service. > Seems like it's the method being used? ### Tuning Agent https://www.elastic.co/guide/en/apm/agent/java/master/tuning-and-overhead.html --- ## Use Case: Collect Logs from Remote VM > The server at "center" will collect the logs from itself and from the remote machines. That is, the `input` of Logstash will contains `beats` and `file`. ![](https://i.imgur.com/B6et9mv.png) ### Center > The logs from "center" will send to Logstash directly. 1. Prepare docker images * [docker.elastic.co/kibana/kibana-oss:7.10.2](https://www.docker.elastic.co/r/kibana/kibana-oss) * [docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2](https://www.docker.elastic.co/r/elasticsearch/elasticsearch-oss) * [docker.elastic.co/logstash/logstash-oss:7.12.0](https://www.docker.elastic.co/r/logstash/logstash-oss) * [docker.elastic.co/apm/apm-server-oss:7.12.0](https://www.docker.elastic.co/r/apm/apm-server-oss) 2. Prepare a docker compose file 3. Prepare logstash.conf * `input`: `beats`, `file` * Prepare regular expression of message 4. Prepare apm-server.docker.yml 5. Prepare Java agent (for APM) 6. (optional) Prepare a script for fast deployment ### Remote machine > These remote machines will send logs to Logstash of "Center" with Filebeat. 1. Prepare docker images * [docker.elastic.co/beats/filebeat-oss:7.12.0](https://www.docker.elastic.co/r/beats/filebeat-oss) 2. Prepare a docker compose file or a `docker run` command 3. Prepare filebeat.yml 5. (optional) Prepare a script for fast deployment ### Sample: logstash.conf at "Center" Let's make it more complicated, assuming that "Center" runs a Apache Karaf service which: * Write Karaf log at runtime * Write GC (Garbage Collection) log at runtime * Has some existing Karaf logs zipped as gzip files (.gz) * Has some existing GC logs * Will receive logs sent by remote Filebeat * Run at IP 192.168.12.34 Advanced requirements: * If grok parsing failed, don't send to ElasticSearch. * Value of the field should start and end with a non-whitespace character (`\S`). * regex sample for this requirement: `\s*(?<field>\S.+\S)\s*` Karaf log sample: ``` 2022-06-14T15:22:02,009 | DEBUG | ELK-Test-Thread | ELK-Test-Remote-logger | 999 - com.example.elk.test - 2.5.0.M18 | random message ``` GC log sample: ``` [2022-06-02T09:31:24.655-0800][0.074s][info ][gc ] Using G1 ``` Remote log sample (use an unusual timestamp format): ``` 2022-06-14 15:22:02.0192||DEBUG| ELK-Test-Remote-logger || random message ``` :::warning Note that this timestamp doesn't match ISO 8601, so we have to define a custom format. Besides, since maximum precision is milliseconds (`SSS`), this timestamp should not include the last digit (`2` of `.0192`). We can use regular expression or [ruby filter](https://www.elastic.co/guide/en/logstash/current/plugins-filters-ruby.html) to deal with it. ::: logstash.conf sample: ```= input { file { path => ["/usr/share/logstash/centerlog/karaf*.log"] start_position => "beginning" type => "karaf" mode => "tail" } file { path => ["/usr/share/logstash/centerlog/karaf*.log.gz"] start_position => "beginning" type => "karaf" mode => "read" } file { path => ["/usr/share/logstash/centerlog/gc*.log"] start_position => "beginning" type => "gc" mode => "tail" } beats { port => 5044 type => "remote" } } filter { if [type] == "karaf" { grok { match => { "message" => "(?<timestamp>\S.+\S)\s*\|\s*(?<level>\S.+\S)\s*\|\s*(?<thread>\S.+\S)\s*\|\s*(?<logger>\S.+\S)\s*\|\s*(?<bundle>\S.+\S)\s*\|\s*(?<msg>\S.+\S)\s*" } } } else if [type] == "gc" { grok { match => { "message" => "\[\s*(?<timestamp>\S.+\S)\s*\]\[\s*(?<time>\S.+\S)\s*\]\[\s*(?<level>\S.+\S)\s*\]\[\s*(?<label>\S.+\S)\s*\]\s*(?<msg>\S.+\S)\s*" } } } else if [type] == "remote" { grok { match => { "message" => "(?<timestamp>\S.+\S)\S\s*\|\|\s*(?<level>\S.+\S)\s*\|\s*(?<logger>\S.+\S)\s*\|\|\s*(?<msg>\S.+\S)\s*" } } } date { match => [ "timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss.SSS" ] remove_field => ["timestamp"] timezone => "Asia/Taipei" } } output { if "_grokparsefailure" not in [tags] { if [type] == "karaf" { elasticsearch { hosts => ["elasticsearch:9200"] index => "karaf-%{+YYYY.MM.dd}" } } else if [type] == "gc" { elasticsearch { hosts => ["elasticsearch:9200"] index => "gc-%{+YYYY.MM.dd}" } } else if [type] == "remote" { elasticsearch { hosts => ["elasticsearch:9200"] index => "remote-%{+YYYY.MM.dd}" } } } } ``` ### Sample: filebeat.yml at "Remote" ```yaml= filebeat.inputs: - type: log enabled: true path: - /usr/share/filebeat/remotelog/remote*.log output.logstash: hosts: ["192.168.12.34:5044"] ``` --- ## Notes ```graphviz digraph parent{ rankdir=LR; st[shape=rect label="other input types"] file[shape=square] fb[label=Filebeat] mb[label=Metricbeat] ls[label=Logstash] otheropt[shape=rect label="other output types"] es[label=ElasticSearch] k[label=Kibana] apm[label=APM] file->fb fb->ls[label=input] st->ls[label=input] file->ls[label=input] ls->otheropt[label=output] mb->es apm->es[label="Java agent"] ls->es[label=output] es->k } ```