# Experiment on Implementaion (Fluentd) ###### tags: `By_Ivan` ```plantuml skinparam activity { BackgroundColor<< Result >> Lightblue BackgroundColor<< Data >> Dimgray BorderColor Peru BorderColor<< Result >> Tomato BorderColor<< Data >> White FontColor<< Result >> Tomato FontColor<< Data >> Lightgray } partition Database { log_data<< Data >>-down>mongoDB snmp_data<< Data >>-down>mongoDB } mongoDB-down>[mongo_tail]Fluentd partition MOD #lightgreen{ others<< Data >>-down>[...]FluentBit metrics<< Data >>-down>[cpu]FluentBit syslog<< Data >>-down>[file]FluentBit } FluentBit->[parsing/filter]FluentBit FluentBit-down->[forward] Fluentd Fluentd->[filter/csv]Fluentd Fluentd-down->[copy]==Routing== ==Routing== -down> SQL -down>report << Result >> ==Routing== -down> Prometheus -down>alert << Result >> ``` Related doc: - [Fluentd](https://hackmd.io/@mcnlab538/B1fyufLQD) - [FluentBit](https://hackmd.io/@mcnlab538/rJItAPi7P) ___ [TOC] ___ ## Environment Setup ```plantuml skinparam activity { BackgroundColor<< Machine >> Lightblue BackgroundColor DarkKhaki FontColor white FontName Aapex } partition "MacOS (Local)" { MongoDB ->"python script" "python script" -> MongoDB } partition "server (remote)"{ partition k8s #lightgray { partition Docker #gray { Fluentd -> MongoDB } } partition centOS #yellowgreen { FluentBit -right> Fluentd } } MongoDB -up> Fluentd ``` ```clike " On Localhost: ~/.ssh/config " Host server ... RemoteForward 0.0.0.0:27017 localhost:27017 Host centOS ... ProxyJump server Host docker ... ProxyJump server ``` ```shell $ # On kubenate controler: port expose setting $ kubctl expose pod --name docker --type=Nodeport --port=24224 $ # Kubctl will give a new port_num : PORT ``` ```clike // On pfsense (firewall) 1. Click Firewall -> NAT 2. Under the Port Forward tab, click on the Add button which has an arrow pointed down 3. Change Protocol to TCP 4. Destination Port Range -> Choose (other) and enter 24224 5. Enter your VIOP server internal IP in the Redirect target IP field 6. Change Redirect target port to Other and enter PORT 7. Add a description and save // no need to expose port 27017 // ssh server will forward the Mongo Database to the remote server // docker only needs to touch mongo://server:27017 to access Mongo Database ``` :::warning ! kubectl is not running on kubenate VM there will be a controller inthe cluster, use that machine to handle port setting. ::: ## Tests ### Bit 2 D ###### First test on connecting with the two machine (centOS and Docker). The goal is to collect centOS' cpu information and send to Docker. This is acomplished by using the "Forward" plugin, which is base on http/tcp connection. FluentBit config : ```r= [INPUT] Name cpu Tag cpu_test [OUTPUT] Name forward Host server Port 24224 Match * # Collects cpu information # Forward the json message to the destination ``` FluentD config : ```r= <source> @type forward bind 0.0.0.0 port 24224 </source> <match *> @type stdout </match> # Listen to the port # Print every message to stdout ``` The data would be print in syslog style by default: ```shell! $ 2020-09-02 02:24:20.050872843 +0000 bit_test: {"cpu_p":0.0,"user_p":0.0,"system_p":0.0,"cpu0.p_cpu":0.0,"cpu0.p_user":0.0,"cpu0.p_system":0.0,"cpu1.p_cpu":0.0,"cpu1.p_user":0.0,"cpu1.p_system":0.0,"cpu2.p_cpu":0.0,"cpu2.p_user":0.0,"cpu2.p_system":0.0,"cpu3.p_cpu":0.0,"cpu3.p_user":0.0,"cpu3.p_system":0.0,"cpu4.p_cpu":0.0,"cpu4.p_user":0.0,"cpu4.p_system":0.0,"cpu5.p_cpu":0.0,"cpu5.p_user":0.0,"cpu5.p_system":0.0,"cpu6.p_cpu":0.0,"cpu6.p_user":0.0,"cpu6.p_system":0.0,"cpu7.p_cpu":0.0,"cpu7.p_user":0.0,"cpu7.p_system":0.0} ``` To print the json message in Pretty style, install and add the pretty format in the match directive. ```shell $ fluent-gem install fluent-plugin-formatter_pretty_json ``` ```r=6 <match *> @type stdout format pretty_json </match> ``` Output result: ```jsonld { "cpu_p": 0.0, "user_p": 0.0, "system_p": 0.0, "cpu0.p_cpu": 0.0, "cpu0.p_user": 0.0, "cpu0.p_system": 0.0, "cpu1.p_cpu": 0.0, "cpu1.p_user": 0.0, "cpu1.p_system": 0.0, "cpu2.p_cpu": 0.0, "cpu2.p_user": 0.0, "cpu2.p_system": 0.0, "cpu3.p_cpu": 0.0, "cpu3.p_user": 0.0, "cpu3.p_system": 0.0, "cpu4.p_cpu": 0.0, "cpu4.p_user": 0.0, "cpu4.p_system": 0.0, "cpu5.p_cpu": 0.0, "cpu5.p_user": 0.0, "cpu5.p_system": 0.0, "cpu6.p_cpu": 0.0, "cpu6.p_user": 0.0, "cpu6.p_system": 0.0, "cpu7.p_cpu": 0.0, "cpu7.p_user": 0.0, "cpu7.p_system": 0.0 } ``` ### D 2 Mongo ###### Tesing on output function and Database integration. The goal is to output fluentD message to a noSQL/SQL database. In this case we're using Mongo as an example. MongoDB integration plugin can be install by gem: ```shell $ fluent-gem install fluent-plugin-mongo ``` FluentD config: ```r= <match *> @type mongo database fluent replace_dot_in_key_with _ host server port 27017 collection d_cpu </match> # input data from fluentBit # same as the setting in the previous test ``` :::warning To connect MongoDB and Docker (and kubenate), please to refer to the [Environment Setting](https://hackmd.io/KxBEZ5BOSk6FgCqvVef5Jg?both#Environment-Setup) ::: MongoDB reulst: ```shell! $ sudo mongo --- ... --- > show dbs admin 0.000GB config 0.000GB fluent 0.000GB > use fluent switched to db fluent > show collections d_cpu > db.d_cpu.find().limit(1) { "_id" : ObjectId("5f4c970be25cb918e215d1f1"), "date" : 1598854853.052838, "cpu_p" : 0, "user_p" : 0, "system_p" : 0, "cpu0_p_cpu" : 0, "cpu0_p_user" : 0, "cpu0_p_system" : 0, "cpu1_p_cpu" : 0, "cpu1_p_user" : 0, "cpu1_p_system" : 0, "cpu2_p_cpu" : 0, "cpu2_p_user" : 0, "cpu2_p_system" : 0, "cpu3_p_cpu" : 0, "cpu3_p_user" : 0, "cpu3_p_system" : 0, "cpu4_p_cpu" : 0, "cpu4_p_user" : 0, "cpu4_p_system" : 0, "cpu5_p_cpu" : 0, "cpu5_p_user" : 0, "cpu5_p_system" : 0, "cpu6_p_cpu" : 0, "cpu6_p_user" : 0, "cpu6_p_system" : 0, "cpu7_p_cpu" : 0, "cpu7_p_user" : 0, "cpu7_p_system" : 0, "time" : ISODate("2020-08-31T06:20:58.068Z") } ``` ### Mongo 2 D 2 Mongo ###### Testing to query from Mongo database Ideally, we hope to track every update in MongoDB. Mongo_tail plugin is packed with mongo plugin. When installing the mongo support, both of them would be install by gem. The plugin would periodically send query request to Mongo database by the last "_id" it read. Any respond newer then the last "_id" would be put into the process pipeline. The following python script would keep updating the database with snmp information. ```python= import os, time, csv, pymongo def MGdbinsert (collection, snmp_oid, time, value, type1): content = {"oid":snmp_oid, "time_t":time, "value":value, "type":type1} collection.insert_one(content) def update_init(host_id, collection, sleep_time): while(1): t = time.time() with open ('getlist.csv', 'r') as csvFile: rows = csv.reader (csvFile, delimiter=',') for row in rows: result = os.popen ("snmpwalk -v 2c -c public "+ host_id + " " + row[0]).readlines() for res in result: try: x = res.replace('\n','').split("= ", 1) y = x[1].split(": ",1) MGdbinsert (collection, row[0], t, y[1], y[0]) except: pass print("database updated") time.sleep(sleep_time) ``` Fluentd config: ```r= <source> @type mongo_tail tag snmp_in wait_time 10 time_key time # ip and port of MongoDB host server port 27017 # specify witch db and coll to watch database snmp collection IN </source> <match snmp_in> @type mongo host server port 27017 database snmp collection OUT </match> # read from snmp.IN # write to snmp.OUT ``` Before read by fluentd: ```shell! > use snmp switched to db snmp > show collections IN > db.IN.findOne({"_id":ObjectId("5f4da4f9c36313e5d1a50e72")}) { "_id" : ObjectId("5f4da4f9c36313e5d1a50e72"), "oid" : ".1.3.6.1.2.1.1.1.0", "time_t" : 1598924025.051392, "value" : "Linux pcsns 4.4.0-185-generic #215-Ubuntu SMP Mon Jun 8 21:53:19 UTC 2020 x86_64", "type" : "STRING" } ``` After read by fluentd: ```shell! > show collections IN OUT > db.OUT.findOne({"_id_str":"5f4da4f9c36313e5d1a50e72"}) { "_id" : ObjectId("5f4da53be25cb922f3a97276"), "oid" : ".1.3.6.1.2.1.1.1.0", "time_t" : 1598924025.051392, "value" : "Linux pcsns 4.4.0-185-generic #215-Ubuntu SMP Mon Jun 8 21:53:19 UTC 2020 x86_64", "type" : "STRING", "_id_str" : "5f4da4f9c36313e5d1a50e72", "time" : ISODate("2020-09-01T01:33:46.211Z") } ``` ### csv Test ###### parse any csv format message Fluentd is using tail plugin to track the csv text, reading each line as seperated events. In the parsing directive, the csv parser would match the words with the given keys (column name). ```r= <source> @type tail path /opsws_n_test.txt <parse> @type csv keys col0,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19,col20,col21,col22,col23,col24,col25,col26 </parse> tag csv_test </source> <match *> @type stdout </match> ``` Output in stdout: ```shell! $ 2020-09-03 17:20:31.550659648 +0800 csv_test: {"col0":"2020/08/13:05:58:22","col1":"543647","col2":"MOV00000002251101372","col3":"MOV00000002251101372","col4":"PRE00000002251101372","col5":"逆轉裁判 第01集","col6":"Ace Attorney","col7":"PG-13","col8":"1429","col9":"成步堂龍一,綾里真宵","col10":null,"col11":"第1集【第一次的逆轉】,新人律師──成步堂龍一第一次接下的委託案就是樁殺人案,而且兇手還是自己兒時的好友,堅信好友清白的成步堂決定站上法庭證明朋友不是兇手,但是種種證據卻對朋友相當不利,究竟成步堂能不能找出隱藏的虛偽中的真實,順利打贏這場官司呢?!","col12":"HD","col13":"15988","col14":"/動漫館-卡通199/推理懸疑/逆轉裁判S1","col15":"0","col16":"30:00:00","col17":"2018/12/07:00:00:00","col18":"2021/12/30:23:59:59","col19":"綜合其他","col20":"2251","col21":"0000","col22":"0000","col23":"VOD","col24":"動漫館-卡通199","col25":"推理懸疑","col26":"逆轉裁判S1"} ``` :::danger Pretty format does not work in this case. It seeems that there is encoding issue with the plugin. ```shell! $ 2020-09-04 14:59:06 +0800 [warn]: #0 emit transaction failed: error_class=Encoding::UndefinedConversionError error="\"\\xE9\" from ASCII-8BIT to UTF-8" location="/usr/share/ruby/json/common.rb:286:in `generate'" tag="csv_test" ``` This should be a easy problem to solve. However this requires modifying to a exist plugin. ::: ### Script input Test (FluentD) ###### The challenge here is to separate the printed lines fromt the script output. FluentD would treat the entire output result as a single message. The goal is to cut them into distinct JSON lines. FluentD config: ```r= <source> @type exec command sadf -p -P ALL -- 1 1 # get cpu message run_interval 10 <parse> @type none # none type parser would cut each line by \n </parse> tag exec_test </source> <filter exec_test> @type parser key_name message <parse> @type tsv keys host, interval, timestamp, cpu_num, type, pct # actual parsing job </parse> </filter> <match *> @type stdout format pretty_json </match> ``` Output result ```shell $ { $ "host": "fedora.lan", $ "interval": "600", $ "timestamp": "2020-09-03 16:10:25 UTC", $ "cpu_num": "all", $ "type": "%user", $ "pct": "0.00" $ } ``` ___ [TOC] ___ Created 2020/09/01 Last Modified 2020/09/01 ___