Fluent Operator Walkthrough ## Install Fluent Operator (Operator only) ## Setup log sinks - Setup ES - Setup Kafka ## Deploy Fluent Bit and Fluentd - Create Fluent Bit Daemonset using FluentBit CRD - Create Fluentd Deployment using Fluentd CRD ## Fluent Bit Only mode - Using Fluent Bit to collect K8s logs and send to es(Config, Input, Filter, output) - Using Fluent Bit to collect kubelet log ## Fluent Bit + Fluentd mode - Foward K8s log to Fluentd and demostrate Fluentd global crds and tenant crds ## Fluentd only mode - Use fluentd to receive logs from HTTP and send to stdout -------- # fluent-operator-walkthrough - [fluent-operator-walkthrough](#fluent-operator-walkthrough) - [Create Kind Cluster](#create-kind-cluster) - [Startup Log sinks](#startup-log-sinks) - [Startup Kafka Cluster](#startup-kafka-cluster) - [Startup ES Cluster](#startup-es-cluster) - [Install Fluent Operator (Operator only)](#install-fluent-operator-operator-only) - [Deploy Fluent Bit or Fluentd](#deploy-fluent-bit-or-fluentd) - [Create Fluent Bit Daemonset using FluentBit CRD](#create-fluent-bit-daemonset-using-fluentbit-crd) - [Create Fluentd Statefulset using Fluentd CRD](#create-fluentd-statefulset-using-fluentd-crd) - [Fluent Bit Only mode](#fluent-bit-only-mode) - [Using Fluent Bit to collect K8s logs and send to es](#using-fluent-bit-to-collect-k8s-logs-and-send-to-es) - [Using Fluent Bit to collect auditd log](#using-fluent-bit-to-collect-auditd-log) - [Fluent Bit + Fluentd mode](#fluent-bit--fluentd-mode) - [Enable Fluent Bit forward plugin](#enable-fluent-bit-forward-plugin) - [ClusterFluentdConfig: Fluentd cluster-wide configuration](#clusterfluentdconfig-fluentd-cluster-wide-configuration) - [FluentdConfig: Fluentd namespaced-wide configuration](#fluentdconfig-fluentd-namespaced-wide-configuration) - [Combining Fluentd cluster-wide and namespaced-wide configuration](#combining-fluentd-cluster-wide-and-namespaced-wide-configuration) - [Combining Fluentd cluster-wide output and namespace-wide output for the multi-tenant scenario](#combining-fluentd-cluster-wide-output-and-namespace-wide-output-for-the-multi-tenant-scenario) - [Outputing logs to Kafka or Elasticsearch](#outputing-logs-to-kafka-or-elasticsearch) - [Using buffer for Fluentd output](#using-buffer-for-fluentd-output) - [Fluentd only mode](#fluentd-only-mode) - [Use fluentd to receive logs from HTTP and send to stdout](#use-fluentd-to-receive-logs-from-http-and-send-to-stdout) - [How to check the configuration and data](#how-to-check-the-configuration-and-data) ## Create Kind Cluster Following the script "create-kind-cluster.sh" to create a kind cluster named kubesphere,you can change the cluster name as what you wanted. ## Startup Log sinks ### Startup Kafka Cluster Following the script "deploy-kafka.sh" to deploy a kafka cluster. ### Startup ES Cluster Following the script "deploy-es.sh" to deploy a es cluster. ## Install Fluent Operator (Operator only) Following the script "deploy-fluent-operator-raw.sh" to deploy a raw Fluent Operator deployment. The Fluent Operator works through the defined CRDs. The orchestration engine can start Fluent Bit instances or Fluentd instances and use Secret Object to mount configuration files on which the instances run. You can find a detailed introduction to CRD at the link below: - https://github.com/fluent/fluent-operator#fluent-bit - https://github.com/fluent/fluent-operator#fluentd ## Deploy Fluent Bit or Fluentd ### Create Fluent Bit Daemonset using FluentBit CRD ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentbit.fluent.io/v1alpha2 kind: FluentBit metadata: name: fluent-bit namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluent-bit spec: image: kubesphere/fluent-bit:v1.8.3 positionDB: hostPath: path: /var/lib/fluent-bit/ resources: requests: cpu: 10m memory: 25Mi limits: cpu: 500m memory: 200Mi fluentBitConfigName: fluent-bit-config tolerations: - operator: Exists affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-role.kubernetes.io/edge operator: DoesNotExist EOF ``` ### Create Fluentd Statefulset using Fluentd CRD ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 3 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" EOF ``` ## Fluent Bit Only mode ### Using Fluent Bit to collect K8s logs and send to es ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentbit.fluent.io/v1alpha2 kind: FluentBit metadata: name: fluent-bit namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluent-bit spec: image: kubesphere/fluent-bit:v1.8.3 positionDB: hostPath: path: /var/lib/fluent-bit/ resources: requests: cpu: 10m memory: 25Mi limits: cpu: 500m memory: 200Mi fluentBitConfigName: fluent-bit-config tolerations: - operator: Exists affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-role.kubernetes.io/edge operator: DoesNotExist --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterFluentBitConfig metadata: name: fluent-bit-config labels: app.kubernetes.io/name: fluent-bit spec: service: parsersFile: parsers.conf inputSelector: matchLabels: fluentbit.fluent.io/enabled: "true" filterSelector: matchLabels: fluentbit.fluent.io/enabled: "true" outputSelector: matchLabels: fluentbit.fluent.io/enabled: "true" --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterInput metadata: name: docker labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: systemd: tag: service.docker path: /var/log/journal db: /fluent-bit/tail/docker.db dbSync: Normal systemdFilter: - _SYSTEMD_UNIT=docker.service --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterInput metadata: name: kubelet labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: systemd: tag: service.kubelet path: /var/log/journal db: /fluent-bit/tail/kubelet.db dbSync: Normal systemdFilter: - _SYSTEMD_UNIT=kubelet.service --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterInput metadata: name: tail labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: tail: tag: kube.* path: /var/log/containers/*.log parser: docker refreshIntervalSeconds: 10 memBufLimit: 5MB skipLongLines: true db: /fluent-bit/tail/pos.db dbSync: Normal --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterFilter metadata: name: systemd labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: match: service.* filters: - lua: script: key: systemd.lua name: fluent-bit-lua call: add_time timeAsTable: true --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterFilter metadata: name: kubernetes labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: match: kube.* filters: - kubernetes: kubeURL: https://kubernetes.default.svc:443 kubeCAFile: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt kubeTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token labels: false annotations: false - nest: operation: lift nestedUnder: kubernetes addPrefix: kubernetes_ - modify: rules: - remove: stream - remove: kubernetes_pod_id - remove: kubernetes_host - remove: kubernetes_container_hash - nest: operation: nest wildcard: - kubernetes_* nestUnder: kubernetes removePrefix: kubernetes_ --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterOutput metadata: name: es labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: matchRegex: (?:kube|service)\.(.*) es: host: elasticsearch-master.elastic.svc port: 9200 generateID: true logstashPrefix: ks-logstash-log logstashFormat: true timeKey: "@timestamp" --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterOutput metadata: name: kafka labels: fluentbit.fluent.io/enabled: "false" fluentbit.fluent.io/component: logging spec: matchRegex: (?:kube|service)\.(.*) kafka: brokers: my-cluster-kafka-bootstrap.kafka.svc:9091,my-cluster-kafka-bootstrap.kafka.svc:9092,my-cluster-kafka-bootstrap.kafka.svc:9093 topics: ks-log EOF ``` Within a couple of minutes, you should check the results in ES cluster or KAFKA cluster. > you can refer the following docs to check results. ### Using Fluent Bit to collect auditd log ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentbit.fluent.io/v1alpha2 kind: FluentBit metadata: name: fluent-bit namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluent-bit spec: image: kubesphere/fluent-bit:v1.8.3 positionDB: hostPath: path: /var/lib/fluent-bit/ resources: requests: cpu: 10m memory: 25Mi limits: cpu: 500m memory: 200Mi fluentBitConfigName: fluent-bit-config tolerations: - operator: Exists affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: node-role.kubernetes.io/edge operator: DoesNotExist --- apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterFluentBitConfig metadata: name: fluent-bit-config labels: app.kubernetes.io/name: fluent-bit spec: inputSelector: matchLabels: fluentbit.fluent.io/enabled: "true" filterSelector: matchLabels: fluentbit.fluent.io/enabled: "true" outputSelector: matchLabels: fluentbit.fluent.io/enabled: "true" --- apiVersion: fluentbit.fluent.io kind: ClusterInput metadata: name: auditd-input labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: tail: tag: auditd path: /var/log/audit/audit.log refreshIntervalSeconds: 10 memBufLimit: 5MB db: /tail/auditd.db dbSync: Normal --- apiVersion: fluentbit.fluent.io kind: ClusterFilter metadata: name: filter-audit-logs labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: match: auditd filters: - recordModifier: records: - node_name ${NODE_NAME} - lua: script: key: auditd.lua name: fluent-bit-auditd-config call: cb_replace timeAsTable: true --- apiVersion: v1 kind: ConfigMap metadata: name: fluent-bit-auditd-config namespace: kubesphere-logging-system data: auditd.lua: | function cb_replace(tag, timestamp, record) if (record["log"] == nil) then return 0, 0, 0 end local new_record = {} timeStr = os.date("!*t", timestamp["sec"]) t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ", timeStr["year"], timeStr["month"], timeStr["day"], timeStr["hour"], timeStr["min"], timeStr["sec"], timestamp["nsec"]) kubernetes = {} kubernetes["pod_name"] = record["node_name"] kubernetes["container_name"] = "auditd" kubernetes["namespace_name"] = "kube-system" new_record["time"] = t new_record["log"] = record["log"] new_record["kubernetes"] = kubernetes return 1, timestamp, new_record end --- apiVersion: fluentbit.fluent.io kind: ClusterOutput metadata: name: auditd-to-es labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: matchRegex: auditd es: host: elasticsearch-master.elastic.svc port: 9200 generateID: true logstashPrefix: ks-logstash-log logstashFormat: true timeKey: "@timestamp" EOF ``` > Before you applying the manifests, please make the former yamls removed. Within a couple of minutes, you should check the results in ES cluster. > you can refer the following docs to check results. ## Fluent Bit + Fluentd mode Fluentd acts as a log forward layer that receives logs from Fluent Bit. ### Enable Fluent Bit forward plugin At first, you should enable the forward plugin in Fluent Bit to send logs to Fluentd. ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentbit.fluent.io/v1alpha2 kind: ClusterOutput metadata: name: fluentd labels: fluentbit.fluent.io/enabled: "true" fluentbit.fluent.io/component: logging spec: matchRegex: (?:kube|service)\.(.*) forward: host: fluentd.kubesphere-logging-system.svc port: 24224 EOF ``` > Before you applying the manifests, please make the former yamls removed. And secondly, Fluentd also needs to use the forward input plugin to receive these input logs. This part has been combined into the following examples. ### ClusterFluentdConfig: Fluentd cluster-wide configuration ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 1 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFluentdConfig metadata: name: fluentd-config labels: config.fluentd.fluent.io/enabled: "true" spec: watchedNamespaces: - kube-system - kubesphere-monitoring-system clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterOutput metadata: name: fluentd-output-es labels: output.fluentd.fluent.io/enabled: "true" spec: outputs: - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log EOF ``` > Before you applying the manifests, please make the former yamls removed. ### FluentdConfig: Fluentd namespaced-wide configuration ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 1 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: FluentdConfig metadata: name: fluentd-config namespace: kubesphere-logging-system labels: config.fluentd.fluent.io/enabled: "true" spec: outputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: Output metadata: name: fluentd-output-es namespace: kubesphere-logging-system labels: output.fluentd.fluent.io/enabled: "true" spec: outputs: - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log EOF ``` > Before you applying the manifests, please make the former yamls removed. ### Combining Fluentd cluster-wide and namespaced-wide configuration ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 1 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFluentdConfig metadata: name: fluentd-config labels: config.fluentd.fluent.io/enabled: "true" spec: watchedNamespaces: - kube-system - kubesphere-monitoring-system clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: FluentdConfig metadata: name: fluentd-config namespace: kubesphere-logging-system labels: config.fluentd.fluent.io/enabled: "true" spec: clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterOutput metadata: name: fluentd-output-es labels: output.fluentd.fluent.io/enabled: "true" spec: outputs: - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log EOF ``` > Before you applying the manifests, please make the former yamls removed. ### Combining Fluentd cluster-wide output and namespace-wide output for the multi-tenant scenario ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 1 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: FluentdConfig metadata: name: fluentd-config-user1 namespace: kubesphere-logging-system labels: config.fluentd.fluent.io/enabled: "true" spec: outputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" output.fluentd.fluent.io/user: "user1" clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" output.fluentd.fluent.io/role: "log-operator" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFluentdConfig metadata: name: fluentd-config-cluster labels: config.fluentd.fluent.io/enabled: "true" spec: watchedNamespaces: - kube-system - kubesphere-system clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" output.fluentd.fluent.io/scope: "cluster" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: Output metadata: name: fluentd-output-user1 namespace: kubesphere-logging-system labels: output.fluentd.fluent.io/enabled: "true" output.fluentd.fluent.io/user: "user1" spec: outputs: - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log-user1 --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterOutput metadata: name: fluentd-output-log-operator labels: output.fluentd.fluent.io/enabled: "true" output.fluentd.fluent.io/role: "log-operator" spec: outputs: - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log-operator --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterOutput metadata: name: fluentd-output-cluster labels: output.fluentd.fluent.io/enabled: "true" output.fluentd.fluent.io/scope: "cluster" spec: outputs: - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log EOF ``` > Before you applying the manifests, please make the former yamls removed. ### Outputing logs to Kafka or Elasticsearch ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 1 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFluentdConfig metadata: name: fluentd-config labels: config.fluentd.fluent.io/enabled: "true" spec: watchedNamespaces: - kube-system - kubesphere-monitoring-system clusterFilterSelector: matchLabels: filter.fluentd.fluent.io/enabled: "true" clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFilter metadata: name: fluentd-filter labels: filter.fluentd.fluent.io/enabled: "true" spec: filters: - recordTransformer: enableRuby: true records: - key: kubernetes_ns value: ${record["kubernetes"]["namespace_name"]} --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterOutput metadata: name: fluentd-output-kafka labels: output.fluentd.fluent.io/enabled: "true" spec: outputs: - kafka: brokers: my-cluster-kafka-bootstrap.default.svc:9091,my-cluster-kafka-bootstrap.default.svc:9092,my-cluster-kafka-bootstrap.default.svc:9093 useEventTime: true topicKey: kubernetes_ns EOF ``` > Before you applying the manifests, please make the former yamls removed. ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 1 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFluentdConfig metadata: name: fluentd-config labels: config.fluentd.fluent.io/enabled: "true" spec: watchedNamespaces: - kube-system - kubesphere-monitoring-system clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterOutput metadata: name: fluentd-output-es labels: output.fluentd.fluent.io/enabled: "true" spec: outputs: - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log EOF ``` > Before you applying the manifests, please make the former yamls removed. ### Using buffer for Fluentd output ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - forward: bind: 0.0.0.0 port: 24224 replicas: 3 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFluentdConfig metadata: name: fluentd-config labels: config.fluentd.fluent.io/enabled: "true" spec: watchedNamespaces: - kube-system - kubesphere-monitoring-system clusterFilterSelector: matchLabels: filter.fluentd.fluent.io/enabled: "true" clusterOutputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterFilter metadata: name: fluentd-filter labels: filter.fluentd.fluent.io/enabled: "true" spec: filters: - recordTransformer: enableRuby: true records: - key: kubernetes_ns value: ${record["kubernetes"]["namespace_name"]} --- apiVersion: fluentd.fluent.io/v1alpha1 kind: ClusterOutput metadata: name: fluentd-output labels: output.fluentd.fluent.io/enabled: "true" spec: outputs: - stdout: {} buffer: type: file path: /buffers/stdout.log - elasticsearch: host: elasticsearch-master.elastic.svc port: 9200 logstashFormat: true logstashPrefix: ks-logstash-log buffer: type: file path: /buffers/es.log EOF ``` > Before you applying the manifests, please make the former yamls removed. ## Fluentd only mode ### Use fluentd to receive logs from HTTP and send to stdout ```shell cat <<EOF | kubectl apply -f - apiVersion: fluentd.fluent.io/v1alpha1 kind: Fluentd metadata: name: fluentd-http namespace: kubesphere-logging-system labels: app.kubernetes.io/name: fluentd spec: globalInputs: - http: bind: 0.0.0.0 port: 9880 replicas: 1 image: kubesphere/fluentd:v1.14.4 fluentdCfgSelector: matchLabels: config.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: FluentdConfig metadata: name: fluentd-config namespace: kubesphere-logging-system labels: config.fluentd.fluent.io/enabled: "true" spec: filterSelector: matchLabels: filter.fluentd.fluent.io/enabled: "true" outputSelector: matchLabels: output.fluentd.fluent.io/enabled: "true" --- apiVersion: fluentd.fluent.io/v1alpha1 kind: Filter metadata: name: fluentd-filter namespace: kubesphere-logging-system labels: filter.fluentd.fluent.io/enabled: "true" spec: filters: - stdout: {} --- apiVersion: fluentd.fluent.io/v1alpha1 kind: Output metadata: name: fluentd-stdout namespace: kubesphere-logging-system labels: output.fluentd.fluent.io/enabled: "true" spec: outputs: - stdout: {} EOF ``` > Before you applying the manifests, please make the former yamls removed. Then you can send logs to the endpoint by using curl: ``` curl -X POST -d 'json={"foo":"bar"}' http://<localhost:9880>/app.log ``` > replace the <localhost:9880> to the actual service or endpoint of fluentd, i.e: fluentd-http.kubesphere-logging-system.svc:9880 ## How to check the configuration and data 1. See the state of the Fluent Operator: ```bash kubectl get po -n kubesphere-logging-system ``` 2. See the generated fluent bit configuration: ```bash kubectl -n kubesphere-logging-system get secrets fluent-bit-config -ojson | jq '.data."fluent-bit.conf"' | awk -F '"' '{printf $2}' | base64 --decode [Service] Parsers_File parsers.conf [Input] Name systemd Path /var/log/journal DB /fluent-bit/tail/docker.db DB.Sync Normal Tag service.docker Systemd_Filter _SYSTEMD_UNIT=docker.service [Input] Name systemd Path /var/log/journal DB /fluent-bit/tail/kubelet.db DB.Sync Normal Tag service.kubelet Systemd_Filter _SYSTEMD_UNIT=kubelet.service [Input] Name tail Path /var/log/containers/*.log Refresh_Interval 10 Skip_Long_Lines true DB /fluent-bit/tail/pos.db DB.Sync Normal Mem_Buf_Limit 5MB Parser docker Tag kube.* [Filter] Name lua Match kube.* script /fluent-bit/config/containerd.lua call containerd time_as_table true [Filter] Name kubernetes Match kube.* Kube_URL https://kubernetes.default.svc:443 Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token Labels false Annotations false [Filter] Name nest Match kube.* Operation lift Nested_under kubernetes Add_prefix kubernetes_ [Filter] Name modify Match kube.* Remove stream Remove kubernetes_pod_id Remove kubernetes_host Remove kubernetes_container_hash [Filter] Name nest Match kube.* Operation nest Wildcard kubernetes_* Nest_under kubernetes Remove_prefix kubernetes_ [Filter] Name lua Match service.* script /fluent-bit/config/systemd.lua call add_time time_as_table true [Output] Name forward Match_Regex (?:kube|service)\.(.*) Host fluentd.kubesphere-logging-system.svc Port 24224 ``` 3. See the generated fluentd configuration: ```bash kubectl -n kubesphere-logging-system get secrets fluentd-config -ojson | jq '.data."app.conf"' | awk -F '"' '{printf $2}' | base64 --decode --- <source> @type forward bind 0.0.0.0 port 24224 </source> <match **> @id main @type label_router <route> @label @48b7cb809bc2361ba336802a95eca0d4 <match> namespaces kube-system,kubesphere-monitoring-system </match> </route> </match> <label @48b7cb809bc2361ba336802a95eca0d4> <filter **> @id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::fluentd-filter-0 @type record_transformer enable_ruby true <record> kubernetes_ns ${record["kubernetes"]["namespace_name"]} </record> </filter> <match **> @id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-0 @type elasticsearch host elasticsearch-master.elastic.svc logstash_format true logstash_prefix ks-logstash-log port 9200 </match> <match **> @id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-kafka-0 @type kafka2 brokers my-cluster-kafka-bootstrap.kafka.svc:9091,my-cluster-kafka-bootstrap.kafka.svc:9092,my-cluster-kafka-bootstrap.kafka.svc:9093 topic_key kubernetes_ns use_event_time true <format> @type json </format> </match> </label> ``` 4. Query the elastic cluster kubernetes_ns buckets: ```bash kubectl -n elastic exec -it elasticsearch-master-0 -c elasticsearch -- curl -X GET "localhost:9200/ks-logstash*/_search?pretty" -H 'Content-Type: application/json' -d '{ "size" : 0, "aggs" : { "kubernetes_ns": { "terms" : { "field": "kubernetes.namespace_name.keyword" } } } }' ``` > If you don't use fluentd to extract the namspace field, you can use other query API. 5. Check the kafka cluster: ```bash # kubectl -n kafka exec -it my-cluster-kafka-0 -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic <namespace or ks-log> ``` > Replace <namespace or ks-log> to the actual topic.