# Experiment on Implementaion (Fluentd)
###### tags: `By_Ivan`
```plantuml
skinparam activity {
BackgroundColor<< Result >> Lightblue
BackgroundColor<< Data >> Dimgray
BorderColor Peru
BorderColor<< Result >> Tomato
BorderColor<< Data >> White
FontColor<< Result >> Tomato
FontColor<< Data >> Lightgray
}
partition Database {
log_data<< Data >>-down>mongoDB
snmp_data<< Data >>-down>mongoDB
}
mongoDB-down>[mongo_tail]Fluentd
partition MOD #lightgreen{
others<< Data >>-down>[...]FluentBit
metrics<< Data >>-down>[cpu]FluentBit
syslog<< Data >>-down>[file]FluentBit
}
FluentBit->[parsing/filter]FluentBit
FluentBit-down->[forward] Fluentd
Fluentd->[filter/csv]Fluentd
Fluentd-down->[copy]==Routing==
==Routing== -down> SQL
-down>report << Result >>
==Routing== -down> Prometheus
-down>alert << Result >>
```
Related doc:
- [Fluentd](https://hackmd.io/@mcnlab538/B1fyufLQD)
- [FluentBit](https://hackmd.io/@mcnlab538/rJItAPi7P)
___
[TOC]
___
## Environment Setup
```plantuml
skinparam activity {
BackgroundColor<< Machine >> Lightblue
BackgroundColor DarkKhaki
FontColor white
FontName Aapex
}
partition "MacOS (Local)" {
MongoDB ->"python script"
"python script" -> MongoDB
}
partition "server (remote)"{
partition k8s #lightgray {
partition Docker #gray {
Fluentd -> MongoDB
}
}
partition centOS #yellowgreen {
FluentBit -right> Fluentd
}
}
MongoDB -up> Fluentd
```
```clike
" On Localhost: ~/.ssh/config "
Host server
...
RemoteForward 0.0.0.0:27017 localhost:27017
Host centOS
...
ProxyJump server
Host docker
...
ProxyJump server
```
```shell
$ # On kubenate controler: port expose setting
$ kubctl expose pod --name docker --type=Nodeport --port=24224
$ # Kubctl will give a new port_num : PORT
```
```clike
// On pfsense (firewall)
1. Click Firewall -> NAT
2. Under the Port Forward tab, click on the Add button which has an arrow pointed down
3. Change Protocol to TCP
4. Destination Port Range -> Choose (other) and enter 24224
5. Enter your VIOP server internal IP in the Redirect target IP field
6. Change Redirect target port to Other and enter PORT
7. Add a description and save
// no need to expose port 27017
// ssh server will forward the Mongo Database to the remote server
// docker only needs to touch mongo://server:27017 to access Mongo Database
```
:::warning
! kubectl is not running on kubenate VM
there will be a controller inthe cluster, use that machine to handle port setting.
:::
## Tests
### Bit 2 D
###### First test on connecting with the two machine (centOS and Docker).
The goal is to collect centOS' cpu information and send to Docker. This is acomplished by using the "Forward" plugin, which is base on http/tcp connection.
FluentBit config :
```r=
[INPUT]
Name cpu
Tag cpu_test
[OUTPUT]
Name forward
Host server
Port 24224
Match *
# Collects cpu information
# Forward the json message to the destination
```
FluentD config :
```r=
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match *>
@type stdout
</match>
# Listen to the port
# Print every message to stdout
```
The data would be print in syslog style by default:
```shell!
$ 2020-09-02 02:24:20.050872843 +0000 bit_test: {"cpu_p":0.0,"user_p":0.0,"system_p":0.0,"cpu0.p_cpu":0.0,"cpu0.p_user":0.0,"cpu0.p_system":0.0,"cpu1.p_cpu":0.0,"cpu1.p_user":0.0,"cpu1.p_system":0.0,"cpu2.p_cpu":0.0,"cpu2.p_user":0.0,"cpu2.p_system":0.0,"cpu3.p_cpu":0.0,"cpu3.p_user":0.0,"cpu3.p_system":0.0,"cpu4.p_cpu":0.0,"cpu4.p_user":0.0,"cpu4.p_system":0.0,"cpu5.p_cpu":0.0,"cpu5.p_user":0.0,"cpu5.p_system":0.0,"cpu6.p_cpu":0.0,"cpu6.p_user":0.0,"cpu6.p_system":0.0,"cpu7.p_cpu":0.0,"cpu7.p_user":0.0,"cpu7.p_system":0.0}
```
To print the json message in Pretty style, install and add the pretty format in the match directive.
```shell
$ fluent-gem install fluent-plugin-formatter_pretty_json
```
```r=6
<match *>
@type stdout
format pretty_json
</match>
```
Output result:
```jsonld
{
"cpu_p": 0.0,
"user_p": 0.0,
"system_p": 0.0,
"cpu0.p_cpu": 0.0,
"cpu0.p_user": 0.0,
"cpu0.p_system": 0.0,
"cpu1.p_cpu": 0.0,
"cpu1.p_user": 0.0,
"cpu1.p_system": 0.0,
"cpu2.p_cpu": 0.0,
"cpu2.p_user": 0.0,
"cpu2.p_system": 0.0,
"cpu3.p_cpu": 0.0,
"cpu3.p_user": 0.0,
"cpu3.p_system": 0.0,
"cpu4.p_cpu": 0.0,
"cpu4.p_user": 0.0,
"cpu4.p_system": 0.0,
"cpu5.p_cpu": 0.0,
"cpu5.p_user": 0.0,
"cpu5.p_system": 0.0,
"cpu6.p_cpu": 0.0,
"cpu6.p_user": 0.0,
"cpu6.p_system": 0.0,
"cpu7.p_cpu": 0.0,
"cpu7.p_user": 0.0,
"cpu7.p_system": 0.0
}
```
### D 2 Mongo
###### Tesing on output function and Database integration.
The goal is to output fluentD message to a noSQL/SQL database. In this case we're using Mongo as an example.
MongoDB integration plugin can be install by gem:
```shell
$ fluent-gem install fluent-plugin-mongo
```
FluentD config:
```r=
<match *>
@type mongo
database fluent
replace_dot_in_key_with _
host server
port 27017
collection d_cpu
</match>
# input data from fluentBit
# same as the setting in the previous test
```
:::warning
To connect MongoDB and Docker (and kubenate), please to refer to the [Environment Setting](https://hackmd.io/KxBEZ5BOSk6FgCqvVef5Jg?both#Environment-Setup)
:::
MongoDB reulst:
```shell!
$ sudo mongo
---
...
---
> show dbs
admin 0.000GB
config 0.000GB
fluent 0.000GB
> use fluent
switched to db fluent
> show collections
d_cpu
> db.d_cpu.find().limit(1)
{ "_id" : ObjectId("5f4c970be25cb918e215d1f1"), "date" : 1598854853.052838, "cpu_p" : 0, "user_p" : 0, "system_p" : 0, "cpu0_p_cpu" : 0, "cpu0_p_user" : 0, "cpu0_p_system" : 0, "cpu1_p_cpu" : 0, "cpu1_p_user" : 0, "cpu1_p_system" : 0, "cpu2_p_cpu" : 0, "cpu2_p_user" : 0, "cpu2_p_system" : 0, "cpu3_p_cpu" : 0, "cpu3_p_user" : 0, "cpu3_p_system" : 0, "cpu4_p_cpu" : 0, "cpu4_p_user" : 0, "cpu4_p_system" : 0, "cpu5_p_cpu" : 0, "cpu5_p_user" : 0, "cpu5_p_system" : 0, "cpu6_p_cpu" : 0, "cpu6_p_user" : 0, "cpu6_p_system" : 0, "cpu7_p_cpu" : 0, "cpu7_p_user" : 0, "cpu7_p_system" : 0, "time" : ISODate("2020-08-31T06:20:58.068Z") }
```
### Mongo 2 D 2 Mongo
###### Testing to query from Mongo database
Ideally, we hope to track every update in MongoDB.
Mongo_tail plugin is packed with mongo plugin. When installing the mongo support, both of them would be install by gem.
The plugin would periodically send query request to Mongo database by the last "_id" it read. Any respond newer then the last "_id" would be put into the process pipeline.
The following python script would keep updating the database with snmp information.
```python=
import os, time, csv, pymongo
def MGdbinsert (collection, snmp_oid, time, value, type1):
content = {"oid":snmp_oid, "time_t":time, "value":value, "type":type1}
collection.insert_one(content)
def update_init(host_id, collection, sleep_time):
while(1):
t = time.time()
with open ('getlist.csv', 'r') as csvFile:
rows = csv.reader (csvFile, delimiter=',')
for row in rows:
result = os.popen ("snmpwalk -v 2c -c public "+ host_id + " " + row[0]).readlines()
for res in result:
try:
x = res.replace('\n','').split("= ", 1)
y = x[1].split(": ",1)
MGdbinsert (collection, row[0], t, y[1], y[0])
except:
pass
print("database updated")
time.sleep(sleep_time)
```
Fluentd config:
```r=
<source>
@type mongo_tail
tag snmp_in
wait_time 10
time_key time
# ip and port of MongoDB
host server
port 27017
# specify witch db and coll to watch
database snmp
collection IN
</source>
<match snmp_in>
@type mongo
host server
port 27017
database snmp
collection OUT
</match>
# read from snmp.IN
# write to snmp.OUT
```
Before read by fluentd:
```shell!
> use snmp
switched to db snmp
> show collections
IN
> db.IN.findOne({"_id":ObjectId("5f4da4f9c36313e5d1a50e72")})
{
"_id" : ObjectId("5f4da4f9c36313e5d1a50e72"),
"oid" : ".1.3.6.1.2.1.1.1.0",
"time_t" : 1598924025.051392,
"value" : "Linux pcsns 4.4.0-185-generic #215-Ubuntu SMP Mon Jun 8 21:53:19 UTC 2020 x86_64",
"type" : "STRING"
}
```
After read by fluentd:
```shell!
> show collections
IN
OUT
> db.OUT.findOne({"_id_str":"5f4da4f9c36313e5d1a50e72"})
{
"_id" : ObjectId("5f4da53be25cb922f3a97276"),
"oid" : ".1.3.6.1.2.1.1.1.0",
"time_t" : 1598924025.051392,
"value" : "Linux pcsns 4.4.0-185-generic #215-Ubuntu SMP Mon Jun 8 21:53:19 UTC 2020 x86_64",
"type" : "STRING",
"_id_str" : "5f4da4f9c36313e5d1a50e72",
"time" : ISODate("2020-09-01T01:33:46.211Z")
}
```
### csv Test
###### parse any csv format message
Fluentd is using tail plugin to track the csv text, reading each line as seperated events.
In the parsing directive, the csv parser would match the words with the given keys (column name).
```r=
<source>
@type tail
path /opsws_n_test.txt
<parse>
@type csv
keys col0,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20,col21,col22,col23,col24,col25,col26
</parse>
tag csv_test
</source>
<match *>
@type stdout
</match>
```
Output in stdout:
```shell!
$ 2020-09-03 17:20:31.550659648 +0800 csv_test: {"col0":"2020/08/13:05:58:22","col1":"543647","col2":"MOV00000002251101372","col3":"MOV00000002251101372","col4":"PRE00000002251101372","col5":"逆轉裁判 第01集","col6":"Ace Attorney","col7":"PG-13","col8":"1429","col9":"成步堂龍一,綾里真宵","col10":null,"col11":"第1集【第一次的逆轉】,新人律師──成步堂龍一第一次接下的委託案就是樁殺人案,而且兇手還是自己兒時的好友,堅信好友清白的成步堂決定站上法庭證明朋友不是兇手,但是種種證據卻對朋友相當不利,究竟成步堂能不能找出隱藏的虛偽中的真實,順利打贏這場官司呢?!","col12":"HD","col13":"15988","col14":"/動漫館-卡通199/推理懸疑/逆轉裁判S1","col15":"0","col16":"30:00:00","col17":"2018/12/07:00:00:00","col18":"2021/12/30:23:59:59","col19":"綜合其他","col20":"2251","col21":"0000","col22":"0000","col23":"VOD","col24":"動漫館-卡通199","col25":"推理懸疑","col26":"逆轉裁判S1"}
```
:::danger
Pretty format does not work in this case. It seeems that there is encoding issue with the plugin.
```shell!
$ 2020-09-04 14:59:06 +0800 [warn]: #0 emit transaction failed: error_class=Encoding::UndefinedConversionError error="\"\\xE9\" from ASCII-8BIT to UTF-8" location="/usr/share/ruby/json/common.rb:286:in `generate'" tag="csv_test"
```
This should be a easy problem to solve. However this requires modifying to a exist plugin.
:::
### Script input Test (FluentD)
###### The challenge here is to separate the printed lines fromt the script output.
FluentD would treat the entire output result as a single message. The goal is to cut them into distinct JSON lines.
FluentD config:
```r=
<source>
@type exec
command sadf -p -P ALL -- 1 1 # get cpu message
run_interval 10
<parse>
@type none
# none type parser would cut each line by \n
</parse>
tag exec_test
</source>
<filter exec_test>
@type parser
key_name message
<parse>
@type tsv
keys host, interval, timestamp, cpu_num, type, pct
# actual parsing job
</parse>
</filter>
<match *>
@type stdout
format pretty_json
</match>
```
Output result
```shell
$ {
$ "host": "fedora.lan",
$ "interval": "600",
$ "timestamp": "2020-09-03 16:10:25 UTC",
$ "cpu_num": "all",
$ "type": "%user",
$ "pct": "0.00"
$ }
```
___
[TOC]
___
Created 2020/09/01
Last Modified 2020/09/01
___