# Workshop Elastic Stack

## Inleiding
Tijdens de crash course kregen jullie een overzicht van het big data landschap. Er werden een aantal pipelines besproken, evenals hoe deze kunnen opgebouwd worden. Tijdens deze workshop is het de bedoeling om de theorie om te zetten in de praktijk.
Jullie gaan aan de slag met de zogenaamde [Elastic Stack](https://www.elastic.co/products/). De dataset bestaat uit Tweets die geselecteerd zijn op de hashtag *#metoo*. De bedoeling is om m.b.v. [Logstash](https://www.elastic.co/products/logstash), [Elasticsearch](https://www.elastic.co/products/elasticsearch) en [Kibana](https://www.elastic.co/products/kibana) de data te **verwerken**, te **analyseren** en te **visualiseren**. Deze combinatie van software werd vroeger de [ELK Stack](https://www.elastic.co/elk-stack) genoemd.
## De pipeline
In deze workshop krijgen jullie toegang tot een endpoint waarop de data reeds beschikbaar is. Die data komt natuurlijk niet uit het niets. Het is niet noodzakelijk de volledige architectuur tot in detail te begrijpen. Toch even een korte samenvatting van waar de data nu precies komt.

Twitter heeft de mogelijkheid om data op te vragen a.d.h.v. een [API](https://developer.twitter.com/). Hiervoor is een developer account vereist. In deze workshop wordt deze API aangesproken via een Python library, namelijk [Tweepy](http://docs.tweepy.org/en/latest/). Met een paar lijntjes code worden alle Tweets met *#metoo* opgevraagd en opgeslagen (in JSON-formaat).
Tijdens de workshop zal een ander Python script de data uitlezen. Het script gaat niets met de data doen, buiten het aan een bepaalde snelheid "producen". Hierdoor ontstaat er als het ware een constante stream aan data.
De Tweets worden (tijdelijk) opgeslagen (producen) in een zogenaamde message broker ([Kafka](https://kafka.apache.org/)). Beschouw Kafka gerust als een centrale opslag waarop andere applicaties kunnen inpikken.
Deze stream zal worden uitgelezen (consumen) door Logstash, behorende tot de Elastic Stack. Logstash kan beschouwd worden als een ETL-tool (Extraction, Transformation, Load). Elke Tweet wordt bekeken, overtollige JSON fields worden weggegooid, er wordt een veld toegevoegd, ... . Uiteindelijk zal Logstash de data doorpompen naar Elasticsearch.
Tot slot gaan we nog een andere component uit de Elastic Stack gebruiken, namelijk Kibana. Deze tool laat o.a. toe om de data op een gebruiksvriendelijke manier te ondervragen evenals te visualiseren. Daarnaast kan Kibana gebruikt worden om de Elastic Stack te managen.
In principe zou deze case perfect uitgewerkt kunnen worden met uitsluitend Logstash ([Twitter input plugin](https://www.elastic.co/guide/en/logstash/7.1/plugins-inputs-twitter.html)), Elasticsearch en Kibana. Zonder Kafka dus. De Twitter API heeft bepaalde limitaties, dit is dan ook de voornaamste reden waarom voor deze oplossing is gekozen.
Logstash heeft zogenaamde [input plugins](https://www.elastic.co/guide/en/logstash/7.1/input-plugins.html), o.a. eentje voor [Twitter](https://www.elastic.co/guide/en/logstash/7.1/plugins-inputs-twitter.html). Op die manier is het mogelijk om de data rechtstreeks via de Twitter API op te vragen. Daarnaast heeft Logstash gelijkaardige mechanismen zoals Kafka aan boord, weliswaar gelimiteerd. Wie daar interesse in heeft, kan best eens kijken naar [Logstash persistent queues](https://www.elastic.co/guide/en/logstash/7.1/persistent-queues.html).
## Workshop omgeving
Tijdens deze workshop kunnen jullie aan de slag met een virtuele machine (VM) waarop reeds software geïnstalleerd en geconfigureerd is.
* [Ubuntu server](https://ubuntu.com/server) 18.04.2 LTS (4.15.0-52-generic)
* [Docker](https://www.docker.com/) version 18.09.6, build 481bc77
* [Docker-compose](https://docs.docker.com/compose/) version 1.24.0, build 0aa59064
Via SSH kan verbonden worden met deze VM (bv. via [Putty](https://www.putty.org/)). De **credentials** zijn terug te vinden op een **apart blaadje**.
* IP: 172.23.82.60
* Username: sslab
* Password: 123
* SSH poort: zie blaadje
* Elasticsearch poort: zie blaadje
* Kibana poort: zie blaadje
## Elastic Stack opzetten
De Elastic Stack ondersteunt zowat alle populaire operating systems, [installatie](https://www.elastic.co/guide/en/elastic-stack/7.1/installing-elastic-stack.html) kan via een archief, package en eventueel met kant-en-klare Docker containers. In deze workshop is voor de laatste optie gekozen.
Het correct opzetten en configureren van de Elastic Stack kan tijdrovend zijn, vooral omwille van gevoelige config files. Meer info? Vraag gerust! Mailtjes na de workshop zijn welkom bij [Esli Heyvaert](mailto:esli@sizingservers.be).
### docker-compose.yml
Navigeer naar de `workshop` directory, en bekijk de inhoud van `docker-compose.yml` (met een tool naar keuze).
```
cd ~/workshop
nano docker-compose.yml
```
In dit bestand worden een aantal verschillende *services* opgelijst en geconfigureerd:
* 3 Elasticsearch nodes
* 1 Logstash node
* 1 Kibana instance
Tijdens deze workshop ligt de focus op de Elastic Stack, het Docker gedeelte komt niet aan bod. Dadelijk worden de services aan de hand van dit bestand gestart.
### kafka_logstash.conf
Het bestand `kafka_logstash.conf` beschrijft een zogenaamde [Logstash pipeline](https://www.elastic.co/guide/en/logstash/7.1/pipeline.html).
De eerste blok configureert de [Kafka input plugin](https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html). Dit zijn m.u.v. `codec` allemaal standaard [Kafka consumer settings](https://kafka.apache.org/documentation/#consumerconfigs). De `codec` geeft aan in welk formaat de data mag verwacht worden, in dit geval zijn de Tweets opgeslagen in een JSON-formaat. Elasticsearch zelf gebruikt eveneens dit formaat.
Men kan gebruik maken van meerdere inputs, eventueel kan de data gecombineerd worden of verder verrijkt worden d.m.v. [lookup enrichments](https://www.elastic.co/guide/en/logstash/7.1/lookup-enrichment.html).
```
input {
kafka {
bootstrap_servers => "bd-kafka-00.cloud2.local:9092"
auto_offset_reset => "earliest"
group_id => "${IP}"
topics => ["metoo-workshop-test"]
codec => json
}
}
```
In de tweede blok wordt de data getransformeerd. Het Python script dat de Twitter API bevraagt kuist de data niet op, m.a.w. elk mogelijk veld dat bij een Tweet hoort wordt doorgestuurd naar de Kafka cluster.
Veel van die velden hebben we niet nodig, deze gaan we dan ook niet opslaan in de Elasticsearch cluster.
Het veldje `source` wordt hernoemd, aangezien dit in de context van Elasticsearch verwarrend kan zijn. Wanneer in Elasticsearch een document wordt geïndexeerd (toegevoegd) wordt de originele content altijd bijgehouden in [`_source`](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/mapping-source-field.html). Een logischere naam is `device`: het type apparaat of platform vanwaar de Tweet is verzonden (bv. Twitter for Android).
Bij `device` wordt normaal gezien altijd een URL meegegeven, dus bv. een link om een of andere app te downloaden. Met `gsub` halen we deze overbodige data weg.
Met de `date` filter zorgen we dat de datum correct geparsed en opgeslagen wordt als een timestamp.
Tot slot wordt er tijdens het *indexeren* een stukje Ruby code uitgevoerd. Het aantal karakters van elke Tweet wordt opgeslagen in het veld `tweet_lenght`. In principe kan dit ook bepaald worden wanneer we de data bevragen. Dit impliceert dat de lengte telkens opnieuw zou berekend worden, wat niet bijster efficiënt zou zijn.
```
filter {
mutate {
remove_field => ["id_str", "place", "..."]
}
mutate {
rename => ["source", "device" ]
}
mutate {
gsub => [
"device", "<.*?>", ""
]
}
date {
match => [ "created_at", "EEE MMM dd HH:mm:ss Z yyyy" ]
locale => "en-US"
}
ruby {
code => 'event.set("tweet_length", event.get("full_text").length)'
}
}
```
Deze laatste blok beschrijft de [output plugins](https://www.elastic.co/guide/en/logstash/7.1/output-plugins.html). Telkens een Tweet verwerkt wordt zal een *"."* naar de console geprint worden. Daarnaast zal de opgekuiste data opgeslagen worden in de Elasticsearch cluster. Het gedeelte i.v.m. `templates` komt later in deze workshop aan bod.
Net zoals bij de *inputs* kunnen er meerdere *outputs* worden ingesteld. Logstash kan m.a.w. volledig los gebruikt worden van Elasticsearch.
```
output {
stdout { codec => dots }
elasticsearch {
hosts => ["es01:9200", "es02:9200", "es03:9200"]
index => "tweets"
template => "/workshop/twitter_template.json"
template_name => "tweets"
template_overwrite => true
}
}
```
### Stack starten
De volledige Elastic Stack kan opgestart worden als volgt:
```
cd ~/workshop
docker-compose up -d
```
Na enkele seconden zou de output er als volgt moeten uitzien:

#### Status van de services (containers) controleren
De status van de verschillende Docker containers kan opgevraagd worden door `docker-compose ps`. De output zou er als volgt moeten uitzien:
```
Name Command State Ports
--------------------------------------------------------------------------------------
es-node-01 /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 9300/tcp
es-node-02 /usr/local/bin/docker-entr ... Up 9200/tcp, 9300/tcp
es-node-03 /usr/local/bin/docker-entr ... Up 9200/tcp, 9300/tcp
kibana /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/tcp
logstash /usr/local/bin/docker-entr ... Up 5044/tcp, 9600/tcp
```
Logs kunnen opgevraagd worden met `docker-compose logs [-f]` of `docker-compose logs [-f] <service-name>`. `-f` is optioneel, staat voor *follow* (zoals bij `tail -f`). De naam van de service wordt gedefinieerd in het bestand `docker-compose.yml` (bv. es01, logstash, ...).
#### Status van de Elasticsearch cluster controleren
Informatie over een node kan o.a. als volgt opgevraagd worden.
```
curl -XGET http://localhost:9200
```
De output zou er als volgt moeten uitzien:
```
{
"name" : "es-node-01",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "ckI5uiK2QOSmi0zSeQDclQ",
"version" : {
"number" : "7.1.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "7a013de",
"build_date" : "2019-05-23T14:04:00.380842Z",
"build_snapshot" : false,
"lucene_version" : "8.0.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
```
Je kan eventueel ook surfen naar http://172.23.82.60:elasticsearch-poort. Dit zou hetzelfde resultaat moeten geven.
De [cluster health API](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/cluster-health.html) kan ons heel wat info geven wat betreft de gehele cluster. Voer onderstaand commando uit:
```
curl -XGET http://localhost:9200/_cluster/health?pretty
```
De output zou er **ongeveer** zoals hieronder moeten uitzien. Later in deze workshop wordt een deel van de info verder besproken.
```
{
"cluster_name" : "docker-cluster",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 3,
"number_of_data_nodes" : 3,
"active_primary_shards" : 4,
"active_shards" : 11,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}
```
Probeer zeker ook eens volgende commando's uit:
<pre>
# Overzicht nodes; ?v = verbose, werkt enkel bij de <a href="https://www.elastic.co/guide/en/elasticsearch/reference/7.1/cat.html">cat API</a>
curl -XGET http://localhost:9200/_cat/nodes?v
# Info over de nodes, q om te verlaten, met pijltjestoetsen scrollen
curl -XGET http://localhost:9200/_nodes?pretty | less
# Enkel de stats van de nodes weergeven
curl -XGET http://localhost:9200/_nodes/stats?pretty | less
</pre>
## Data bevragen
Als alles goed is, zouden er nu voortdurend nieuwe Tweets in je Elasticsearch cluster moeten terechtkomen. De data bevragen, analyseren en visualiseren gaan jullie doen m.b.v. Kibana.
### Kibana
Normaal gezien zou je moeten kunnen surfen naar http://172.23.82.60:kibana-poort. Klik op "Explore on my own". Vooraleer effectief de data te bevragen, kan je indien gewenst monitoring inschakelen. Dit geeft een snel, mooi overzicht van je Elastic Stack.

Na een dertigtal seconden zou de interface gelijkaardig moeten zijn aan onderstaande screenshot. Merk op: er kan eventueel gespeeld worden met de time-range & auto-refresh.

Daarna mag je navigeren naar de *"Dev Tools"* (steeksleutel).
### Data verkennen
Voer volgende query uit. Doe dit door op de groene pijl te klikken of door de toetsencombinatie `CTRL + ENTER`. De query's worden *"in blok"* uitgevoerd, je kan dus meerdere query's onder mekaar plaatsen.
```
GET tweets/_search
```
De output ziet er wellicht ongeveer zo uit:

Alvast een aantal opmerkingen:
* Took: hoelang duurde het om de query uit te voeren
* Indien de query een aantal keer uitgevoerd wordt, zal dit verlagen (caching)
* Het totaal aantal hits is groter dan of gelijk aan (gte) 10000.
* Sinds [Elasticsearch versie 7.0](https://www.elastic.co/blog/elasticsearch-7-0-0-released) wordt soms een schatting gemaakt, dit is sneller.
* Om het exact aantal te krijgen kan volgende query gebruikt worden:
```
GET tweets/_search
{
"track_total_hits": true
}
```
* De `score` en `max_score` zal in dit geval altijd 1 zijn: hoe goed is het antwoord op de query.
De volgende query zal zoeken naar Tweets waarin de naam *"Harvey Weinstein"* voorkomt.
```
GET tweets/_search
{
"query": {
"match": {
"full_text": "harvey WEINSTEIN"
}
}
}
```
De resultaten zien er (hopelijk) correct uit. Probeer nu eens dezelfde query uit te voeren met *"Bart De Pauw"*.
```
GET tweets/_search
{
"query": {
"match": {
"full_text": "Bart De Pauw"
}
}
}
```
De resultaten zijn wellicht teleurstellend. Plaats eventueel `"size": 100` voor `"query": ...`. De [*"match query"*](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/query-dsl-match-query.html) behandeld elk woord apart, tussen elk woord kan als het ware een `"OR"` gezet worden. Om de vorige query te verbeteren kan er ook een `"AND"` tussen elk woord gezet worden.
```
GET tweets/_search
{
"size": 100,
"query": {
"match": {
"full_text": {
"query": "Bart De Pauw",
"operator": "and"
}
}
}
}
```
Dit zou een beter resultaat moeten geven. Onderstaande query is mogelijks een andere oplossing:
```
GET tweets/_search
{
"size": 100,
"query": {
"match": {
"full_text": {
"query": "Bart De Pauw",
"minimum_should_match": 2
}
}
}
}
```
Er zijn nog tal van andere manieren om een accuraat antwoord te krijgen op vragen zoals bovenstaande. Bekijk bijvoorbeeld eens de [match phrase query](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/query-dsl-match-query-phrase.html) of de [bool query](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/query-dsl-bool-query.html).
Verschillende type query's kunnen eenvoudig gecombineerd worden, bekijk volgende query eens aandachtig:
```
GET tweets/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"full_text": "weinstein"
}
},
{
"range" : {
"tweet_length": {
"gte" : 100
}
}
}
],
"must_not": [
{
"match": {
"lang": "nl"
}
}
]
}
}
}
```
De query zal alle Tweets weergeven met:
* In de tekst "weinstein"
* Tweets met meer dan of gelijk aan 100 karakters
* In alle mogelijke talen, behalve in het Nederlands
### Eenvoudige analyses
Volgende query haalt alle Nederlandstalige Tweets op:
```
GET tweets/_search
{
"query": {
"match": {
"lang": "nl"
}
}
}
```
Volgende query is een zogenaamde [aggregation](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/search-aggregations.html), meer bepaald een [metrics aggregation](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/search-aggregations-metrics.html). De query geeft de gemiddelde lengte van alle Tweets weer. Merk op dat "size": 0 wordt meegegeven, de Tweets zelf zullen dus niet worden weergegeven (efficiëntie).
```
GET tweets/_search
{
"size": 0,
"aggs": {
"average_message_size": {
"avg": {
"field": "tweet_length"
}
}
}
}
```
De vorige twee query's kunnen als volgt gecombineerd worden:
```
GET tweets/_search
{
"size": 0,
"query": {
"match": {
"lang": "nl"
}
},
"aggs": {
"average_message_size_dutch_tweets": {
"avg": {
"field": "tweet_length"
}
}
}
}
```
Om het gemiddelde te verkrijgen hebben we `"avg"` gebruikt. Om de langste en korstte Tweet te verkrijgen kan je respectievelijk gebruik maken van `"min"` en `"max"`. Onderstaande query combineert al deze opties:
```
GET tweets/_search
{
"size": 0,
"aggs": {
"tweet_length_stats": {
"stats": {
"field": "tweet_length"
}
}
}
}
```
Elasticsearch laat ons ook toe om [histograms](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/search-aggregations-bucket-histogram-aggregation.html) te genereren. Volgende query deelt de Tweets in op basis van lengte.
```
GET tweets/_search
{
"size": 0,
"aggs": {
"tweet_length_histogram": {
"histogram": {
"field": "tweet_length",
"interval": 50
}
}
}
}
```
Ook hier kan je weer verschillende aggregaties gaan nesten:
```
GET tweets/_search
{
"size": 0,
"aggs": {
"tweet_length_histogram": {
"histogram": {
"field": "tweet_length",
"interval": 50
},
"aggs": {
"tweet_stats": {
"stats": {
"field": "tweet_length"
}
}
}
}
}
}
```
Tot slot nog een query om de 10 populairste device-categorieën te bepalen:
```
GET tweets/_search
{
"size": 0,
"aggs": {
"devices_terms": {
"terms": {
"field": "device",
"size": 10
}
}
}
}
```
## Data visualiseren
Navigeer naar *"Visualize"* in Kibana. In het vak *"Create index pattern"* geef je als *"Index pattern" "tweets"* op, daarna klik je op *"Next step"*. Bij *"Time Filter field name"* selecteer je *"@timestamp"*, tot slot klik je op *"Create index pattern"*.
Ga naar *"Visualize"*, klik op *"Create a visualization"*, daarna op *"Vertical bar"*. Selecteer *"tweets"*, bij *"Buckets"* klik op *"X-Axis"*, selecteer een *"Histogram"* aggregation, als *"Field"* kies je voor *"tweet_length"*, *"Minimum interval"* stel je in op *"50"*. Bij *"Custom Label"* kan je bv. *"Tweet Length"* opgeven. Bij de *"Y-Axis"* kan je eventueel ook nog het *"Custom Label"* aanpassen naar *"Tweets"*. Daarna kan je op de blauwe pijl drukken en zal de grafiek verschijnen. **Pas de time-range aan** (bovenaan rechts).

In sommige categorieën zullen maar een aantal Tweets zitten, je kan deze er eventueel uitfilteren, klik daarvoor bovenaan op *"Add filter"*, als *"Field"* selecteer je *"tweet_length"*, als *"Operator"* kies je *"is between"* en dan bv. *"From"* 0 *"To"* 350.

Bij de *"X-Axis"* zou je bijvoorbeeld nog *"sub-buckets"* kunnen toevoegen. Zie bijvoorbeeld zoals onderstaande screenshot.

Het eindresultaat zie je op onderstaande screenshot. De visualisatie groepeert de Tweets volgens hun berichtlengte. Binnen elke groep wordt weergegeven hoeveel Tweets er per type apparaat geïndexeerd zijn.

## Mapping
Mapping in Elasticsearch is een belangrijk concept wanneer efficiëntie wordt nagestreefd. Het doel van mapping is om enerzijds de data op een correcte (compacte) manier op te slaan en anderzijds de query-time zo laag mogelijk te houden. Bij mapping zorgen we er voor dat onze data (deels) aan een bepaalde schema voldoet.
Het is niet de bedoeling om in deze workshop daar diep op in te gaan. Voor wie meer wil weten, kan de [mapping documentatie](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/mapping.html) best eens doorlezen.
Toch willen we graag nog enkele pointers meegeven wat betreft mapping.
Het doel van volgende query is alle Tweets na 1 januari 2020 op te vragen. Voer uit:
```
GET tweets/_search
{
"query": {
"range": {
"created_at": {
"gt": "2020-01-01"
}
}
}
}
```
Het resultaat is wellicht niet wat je had verwacht.
Bekijk de mapping van de Tweets index:
```
GET tweets/_mapping
```
Indien je `created_at` aanpast naar `@timestamp` zou het resultaat realistischer moeten zijn:
```
GET tweets/_search
{
"query": {
"range": {
"@timestamp": {
"gt": "2020-01-01"
}
}
}
}
```
Merk op dat Elasticsearch bij het toevoegen van nieuwe *documents* zelf een mapping zal proberen aanmaken wanneer dit nodig blijkt te zijn. Het `created_at` field is daar een voorbeeld van. Echter werd voor jullie de mapping reeds ingesteld, bekijk maar eens de inhoud van `twitter_template.json`.
## Sharding
Herinner je het `cURL` commando, in Kibana kan dat uitgevoerd worden als volgt:
```
GET _cluster/health
```
Als laatste deel van deze workshop bekijken we kort hoe Elasticsearch zijn data verdeelt: sharding.
In de mapping template (`twitter_template.json`), staat ingesteld dat de Twitter index aangemaakt moet worden met 3 primary shards en 2 replicas. Jullie kregen tijdens de introductie wat uitleg hierover.
Met de `_cat` API kunnen we de *indices* en *shards* ietwat grondiger bekijken:
```
GET _cat/indices?v
GET _cat/shards?v
```
Je zal zien dat de *shards* mooi worden verdeeld over de verschillende nodes. Een *replica shard* zal nooit aan dezelfde node worden toegewezen waar reeds de *primary shard* is opgeslagen. Wanneer *(primary / replica) shards* verplaatst, aangemaakt of verwijdert dienen te worden, resulteert dit in een *task*. Hetzelfde geldt voor het aanmaken en verwijderen van indices.
Stop nu de derde Elasticsearch node:
```
docker-compose stop es03
```
Bekijk meteen een aantal keer de output van volgende commando's:
```
GET _cluster/health
GET _cat/shards?v
```
Je zou quasi onmiddellijk het effect moeten zien, bekijk zeker ook de status van de cluster. Wanneer er nog een node zou uitgeschakeld worden, zal de cluster niet langer operationeel zijn. Het beoogde [quorum](https://www.elastic.co/guide/en/elasticsearch/reference/7.1/modules-discovery-quorums.html) is dan niet langer behaald. Elasticsearch kan niet langer garanderen dat de data correct wordt opgeslagen en dat er geen data-loss gaat optreden.