51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Kubernetes集群监控-Kafka对接EL(Fluentd)K构建超强日志体系

虚拟化运维 Log Kubernetes

Kubernetes集群监控-Kafka对接EL(Fluentd)K构建超强日志体系 {#CrawlerTitle}

王先森 2024-12-18 2024-12-18

简介 {#简介}

Kafka 在日志体系中的核心作用 {#Kafka-在日志体系中的核心作用}

在当今复杂的分布式系统环境下,日志管理变得愈发重要,而 Kafka 在其中扮演着极为关键的角色。对于大型系统,特别是基于微服务设计的分布式系统而言,日志来源众多,数据量庞大。如果直接将大量日志通过传统方式收集输出至存储或分析工具,很有可能会导致诸如 Elasticsearch 等工具崩溃,这时候就凸显出 Kafka 的价值了。

Kafka 具有出色的分布式扩容能力,能够轻松应对不断增长的日志数据量。在高并发场景下,它的优势更是明显,例如在面对大量微服务同时产生日志并需要处理的情况时,依然可以稳定地接收和暂存这些日志信息,起到了很好的缓冲作用。而且,它采用发布 - 订阅的消息模式,消息生产者(也就是各个产生日志的应用、服务等)将日志消息发布到 Kafka 的主题 (Topic) 中,多个消息消费者(比如后续对接的用于日志处理、分析等的工具)可以同时订阅这些主题来消费消息,实现了一对多的数据流转模式,方便不同环节的组件对日志进行相应处理,为整个日志体系搭建起了可靠的数据流转基础。

架构图

EL (Fluentd) K 的组合优势 {#EL-Fluentd-K-的组合优势}

高效的日志收集与处理 {#高效的日志收集与处理}

Fluentd 在日志收集阶段发挥重要作用。Fluentd 具有丰富的插件系统,能够轻松地从各种数据源(如服务器、容器等)收集日志。无论是文本文件、系统日志还是网络数据,Fluentd 都能有效地进行采集,确保日志数据不会遗漏。

强大的日志缓冲能力 {#强大的日志缓冲能力}

Kafka 在图中处于日志缓冲的位置。Kafka 是一个高吞吐量、分布式的消息队列系统。它能够在短时间内处理大量的日志数据,并将其进行缓存。这一特性使得在日志高峰期,系统不会因为数据量过大而出现堵塞或者数据丢失的情况,保证了日志数据的完整性和及时性。

灵活的日志转换 {#灵活的日志转换}

Logstash 在图中负责日志转换。Logstash 可以对从 Kafka 接收到的日志数据进行灵活的转换操作。例如,它可以对日志格式进行标准化处理,将不同格式的日志转换为统一的、便于存储和分析的格式。同时,还可以进行数据过滤、字段提取等操作,让日志数据更加符合实际分析需求。

可靠的日志存储 {#可靠的日志存储}

Elasticsearch 作为日志存储组件,具有出色的分布式存储和搜索能力。它能够将经过处理的日志数据进行可靠的存储,并且提供快速的全文搜索功能。无论是从海量的日志数据中查找特定事件,还是进行复杂的数据分析,Elasticsearch 都能够高效地满足需求。

直观的日志呈现 {#直观的日志呈现}

Kibana 在图中的最后环节负责日志呈现。Kibana 可以与 Elasticsearch 紧密集成,通过直观的可视化界面将存储在 Elasticsearch 中的日志数据进行展示。用户可以轻松地创建各种图表、仪表盘,从不同维度对日志数据进行分析和监控,快速发现系统中的潜在问题。

综上所述,EL (Fluentd) K 的组合通过各个组件的协同工作,形成了一个从日志收集、缓冲、转换、存储到呈现的完整、高效且可靠的日志处理体系。

搭建 Kafka 集群 {#搭建-Kafka-集群}

Kafka 最初由 Linkedin 公司开发,是一款分布式、分区化、多副本且多订阅者的系统,依托 zookeeper 协调运作,既可用作分布式日志系统,也能当作 MQ 系统,在 web/nginx 日志处理、访问日志管理以及消息服务等领域应用广泛。2010 年,Linkedin 将其贡献给 Apache 基金会,使之成为顶级开源项目。其主要应用场景集中在日志收集系统与消息系统两大方面。

安装 jdk 依赖 {#安装-jdk-依赖}

首先,我们需要安装 jdk 依赖。前往 jdk 下载地址: https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 下载合适版本的 jdk。

在服务器上执行以下操作:

|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 | [root@kafka-server ~]# cd /server/tools # 安装 jdk 软件包,此处以 jdk-8u431-linux-x64.rpm 为例 [root@k8s-master tools]# rpm -ivh jdk-8u431-linux-x64.rpm # 检查 jdk 安装是否成功 [root@k8s-master tools]# java -version java version "1.8.0_431" Java(TM) SE Runtime Environment (build 1.8.0_431-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.431-b11, mixed mode) |

安装 Kafka {#安装-Kafka}

完成 jdk 安装后,开始安装 Kafka。

|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 | [root@kafka-server ~]# cd /server/tools # 下载 Kafka 安装包,这里以 kafka_2.12-3.6.2.tgz 为例 [root@kafka-server tools]# wget https://archive.apache.org/dist/kafka/3.6.2/kafka_2.12-3.6.2.tgz # 解压 Kafka 安装包到 /opt/ 目录 [root@kafka-server tools]# tar xf kafka_2.12-3.6.2.tgz.tgz -C /opt/ # 创建软链接,方便后续操作 [root@kafka-server tools]# ln -s /opt/kafka_2.12-3.6.2.tgz /opt/kafka |

配置 Kafka {#配置-Kafka}

安装完成后,对 Kafka 进行配置。

|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | [root@kafka-server kafka]# vim /opt/kafka/config/server.properties # 设置 Kafka 日志存储目录 log.dirs=/data/kafka/logs # 指定 zookeeper 连接地址 zookeeper.connect=localhost:2181 # 设置日志刷新消息间隔 log.flush.interval.messages=10000 # 设置日志刷新时间间隔 log.flush.interval.ms=1000 # 启用删除主题功能 delete.topic.enable=true # 设置 Kafka 服务器主机名 host.name=kafka-server.boysec.cn [root@kafka-server kafka]# mkdir -p /data/kafka/logs |

启动 Kafka {#启动-Kafka}

最后,启动 Kafka 服务。

|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 | [root@k8s-slave kafka]# /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties [root@k8s-slave kafka]# /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties # 检查 Kafka 服务是否成功启动,查看 9092 端口是否监听 [root@k8s-slave kafka]# netstat -lntup |grep 9092 tcp6 0 0 10.1.1.130:9092 :::* LISTEN 103655/java |

通过以上步骤,我们完成了 Kafka 的安装、配置与启动,为后续构建强大的日志体系奠定了坚实的基础。在实际应用中,可根据具体需求进一步优化 Kafka 的各项参数设置,以实现更高效稳定的运行。

验证Kafka {#验证Kafka}

|------------------------------------------|| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | # 创建topic /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 10.1.1.130:9092 --replication-factor 1 --partitions 3 --topic cola # 查看topic /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.1.1.130:9092 # 本地测试--发送消息 /opt/kafka/bin/kafka-console-producer.sh --broker-list 10.1.1.130:9092 --topic cola # 本地测试---查看接收消息 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.1.1.130:9092 --topic cola --from-beginning # 查看特定主题详细信息 /opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.1.1.130:9092 --describe --topic cola |

Kafka-Manager(可选安装) {#Kafka-Manager-可选安装}

Kafka-Manager 是雅虎开源的一款用于管理 Apache Kafka 集群的工具。

Github 地址https://github.com/yahoo/CMAK。

功能特点 {#功能特点}

  • 集群管理
    • 支持对多个集群进行管理。
    • 能够轻松查看集群状态,包括主题、消费者、偏移量、代理(brokers)、副本分布、分区分布等信息。

方法一(废弃) {#方法一-废弃}

|------------------------------------------------------|| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | FROM hseeberger/scala-sbt ENV ZK_HOSTS=10.1.1.130:2181 \ KM_VERSION=2.0.0.2 RUN mkdir -p /tmp && \ cd /tmp && \ wget https://github.com/yahoo/kafka-manager/archive/${KM_VERSION}.tar.gz && \ tar xf ${KM_VERSION}.tar.gz && \ cd /tmp/CMAK-${KM_VERSION} && \ sbt clean dist && \ unzip -d / ./target/universal/kafka-manager-${KM_VERSION}.zip && \ rm -fr /tmp/${KM_VERSION} /tmp/kafka-manager-${KM_VERSION} WORKDIR /kafka-manager-${KM_VERSION} EXPOSE 9000 ENTRYPOINT ["./bin/kafka-manager","-Dconfig.file=conf/application.conf"] |

构建镜像: docker build . -t harbor.od.com/infra/kafka-manager:v2.0.0.2

方法二 {#方法二}

注意事项:
在此方法中,我们可直接下载镜像来部署 Kafka-Manager。值得一提的是,本人已制作了 3.0.0.53.0.0.6 版本的 kafaka-manager 镜像,这些镜像均可直接用于部署操作,极大地简化了部署流程。

资源配置清单相关操作:
首先,我们需要在服务器上创建用于存放 Kafka-Manager 的 Kubernetes 资源配置清单的目录,执行以下命令:

|-----------|----------------------------------------------| | 1 | mkdir -p /var/k8s-yaml/kafka-manager |

此目录将用于存放后续创建 Kubernetes 资源(如 Deployment、Service 等)所需的 yaml 文件,以便更好地组织和管理与 Kafka-Manager 部署相关的资源配置信息,使整个部署过程更加清晰、有序且易于维护。
Deployment Service Ingress

vim /var/k8s-yaml/kafka-manager/deployment.yaml

|---------------------------------------------------------------------------------------------------------------------|| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | kind: Deployment apiVersion: apps/v1 metadata: name: kafka-manager namespace: logging labels: name: kafka-manager spec: replicas: 1 selector: matchLabels: app: kafka-manager strategy: type: RollingUpdate rollingUpdate: maxUnavailable: 1 maxSurge: 1 revisionHistoryLimit: 7 progressDeadlineSeconds: 600 template: metadata: labels: app: kafka-manager spec: containers: - name: kafka-manager image: wangxiansen/kafka-manager:debian-v3.0.0.6 imagePullPolicy: IfNotPresent ports: - containerPort: 9000 protocol: TCP env: - name: ZK_HOSTS value: 10.1 .1 .10 :2181 - name: APPLICATION_SECRET value: letmein terminationGracePeriodSeconds: 30 securityContext: runAsUser: 0 |

vim /var/k8s-yaml/kafka-manager/svc.yaml

|------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 | kind: Service apiVersion: v1 metadata: name: kafka-manager namespace: logging spec: ports: - protocol: TCP port: 9000 targetPort: 9000 selector: app: kafka-manager |

vim /var/k8s-yaml/kafka-manager/ingress.yaml

|------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | apiVersion: traefik.containo.us/v1alpha1 kind: IngressRoute metadata: name: kafka-manager-web namespace: logging spec: entryPoints: - web routes: - match: Host(`km.od.com`) kind: Rule services: - name: kafka-manager port: 9000 |

应用配置清单:

|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 | kubectl apply -f /var/k8s-yaml/kafka-manager/deployment.yaml kubectl apply -f /var/k8s-yaml/kafka-manager/svc.yaml kubectl apply -f /var/k8s-yaml/kafka-manager/ingress.yaml |

访问: http://km.od.com

添加集群

集群状态

Fluentd 配置 Kafka {#Fluentd-配置-Kafka}

现在有了 Kafka,我们就可以将 Fluentd 的日志数据输出到 Kafka 了,只需要将 Fluentd 配置中的 <match> 更改为使用 Kafka 插件即可,但是在 Fluentd 中输出到 Kafka,需要使用到 fluent-plugin-kafka 插件,所以需要我们自定义下 Docker 镜像,最简单的做法就是在上面 Fluentd 镜像的基础上新增 kafka 插件即可,Dockerfile 文件如下所示:

|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 | FROM quay.io/fluentd_elasticsearch/fluentd:v5 RUN echo "source 'https://mirrors.tuna.tsinghua.edu.cn/rubygems/'" > Gemfile && gem install bundler RUN gem install fluent-plugin-kafka --no-document |

使用上面的 Dockerfile 文件构建一个 Docker 镜像即可,我这里构建过后的镜像名为 wangxiansen/fluentd_kafka:v5
ConfigMap DaemonSet RBAC

vim /var/k8s-yaml/fluentd/configmap.yaml

|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | kind: ConfigMap apiVersion: v1 metadata: name: fluentd-conf namespace: logging data: # 容器日志 containers.input.conf: |- <source> @id fluentd-containers.log @type tail # Fluentd 内置的输入方式,其原理是不停地从源文件中获取新的日志 path /var/log/containers/*.log # Docker 容器日志路径 pos_file /var/log/containers.log.pos # 记录读取的位置 tag raw.kubernetes.* # 设置日志标签 read_from_head true # 从头读取 <parse> # 多行格式化成JSON # 可以使用我们介绍过的 multiline 插件实现多行日志 @type multi_format # 使用 multi-format-parser 解析器插件 <pattern> format json # JSON解析器 time_key time # 指定事件时间的时间字段 time_format %Y-%m-%dT%H:%M:%S.%NZ # 时间格式 </pattern> <pattern> format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/ time_format %Y-%m-%dT%H:%M:%S.%N%:z </pattern> </parse> </source> # 在日志输出中检测异常(多行日志),并将其作为一条日志转发 # https://github.com/GoogleCloudPlatform/fluent-plugin-detect-exceptions <match raw.kubernetes.**> # 匹配tag为raw.kubernetes.**日志信息 @id raw.kubernetes @type detect_exceptions # 使用detect-exceptions插件处理异常栈信息 remove_tag_prefix raw # 移除 raw 前缀 message log stream stream multiline_flush_interval 5 </match> <filter **> # 拼接日志 @id filter_concat @type concat # Fluentd Filter 插件,用于连接多个日志中分隔的多行日志 key message multiline_end_regexp /\n$/ # 以换行符"\n"拼接 separator "" </filter> # 添加 Kubernetes metadata 数据 <filter kubernetes.**> @id filter_kubernetes_metadata @type kubernetes_metadata </filter> # 修复 ES 中的 JSON 字段 # 插件地址:https://github.com/repeatedly/fluent-plugin-multi-format-parser <filter kubernetes.**> @id filter_parser @type parser # multi-format-parser多格式解析器插件 key_name log # 在要解析的日志中指定字段名称 reserve_data true # 在解析结果中保留原始键值对 remove_key_name_field true # key_name 解析成功后删除字段 <parse> @type multi_format <pattern> format json </pattern> <pattern> format none </pattern> </parse> </filter> # 删除一些多余的属性 <filter kubernetes.**> @type record_transformer remove_keys $.docker.container_id,$.kubernetes.container_image_id,$.kubernetes.pod_id,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels.pod-template-hash </filter> # 只保留具有logging=true标签的Pod日志 <filter kubernetes.**> @id filter_log @type grep <regexp> key $.kubernetes.labels.logging pattern ^true$ </regexp> </filter> <filter **> @type stdout </filter> # ##### 监听配置,一般用于日志聚合用 ###### forward.input.conf: |- # 监听通过TCP发送的消息 <source> @id forward @type forward </source> output.conf: |- <match **> @id kafka @type kafka @log_level info # list of seed brokers brokers "#{ENV['FLUENT_KAFKA_BROKERS'] || 'localhost:9092'}" use_event_time true # topic settings 默认topic是messages topic_key k8slog default_topic "#{ENV['FLUENT_KAFKA_TOPIC'] || 'messages'}" # data type settings <format> @type json </format> # producer settings required_acks -1 compression_codec gzip </match> |

vim /var/k8s-yaml/fluentd/daemonset.yaml

|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | apiVersion: apps/v1 kind: DaemonSet metadata: name: fluentd namespace: logging labels: app: fluentd kubernetes.io/cluster-service: 'true' spec: selector: matchLabels: app: fluentd template: metadata: labels: app: fluentd kubernetes.io/cluster-service: 'true' spec: tolerations: - operator: Exists serviceAccountName: fluentd-es containers: - name: fluentd image: wangxiansen/fluentd_kafka:v5 env: - name: K8S_NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName - name: FLUENT_KAFKA_BROKERS value: "10.1.1.10:9092" - name: FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX value: "k8s" resources: limits: cpu: 100m memory: 400Mi requests: cpu: 50m memory: 300Mi volumeMounts: - name: fluentconfig mountPath: /etc/fluent/config.d - name: varlog mountPath: /var/log - name: dockerlog mountPath: /var/log/containers/ # mountPath: /data/docker/containers volumes: - name: fluentconfig configMap: name: fluentd-conf - name: varlog hostPath: path: /var/log - name: dockerlog hostPath: path: /var/log/containers/ # path: /data/docker/containers |

vim /var/k8s-yaml/fluentd/rbac.yaml

|------------------------------------------------------------------------------------------------------------------------------------------|| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | apiVersion: v1 kind: ServiceAccount metadata: name: fluentd-es namespace: logging labels: k8s-app: fluentd-es kubernetes.io/cluster-service: 'true' addonmanager.kubernetes.io/mode: Reconcile --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fluentd-es labels: k8s-app: fluentd-es kubernetes.io/cluster-service: 'true' addonmanager.kubernetes.io/mode: Reconcile rules: - apiGroups: - '' resources: - 'namespaces' - 'pods' verbs: - 'get' - 'watch' - 'list' --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: fluentd-es labels: k8s-app: fluentd-es kubernetes.io/cluster-service: 'true' addonmanager.kubernetes.io/mode: Reconcile subjects: - kind: ServiceAccount name: fluentd-es namespace: logging apiGroup: '' roleRef: kind: ClusterRole name: fluentd-es apiGroup: '' |

应用配置清单:

|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 | kubectl apply -f /var/k8s-yaml/fluentd/rbac.yaml kubectl apply -f /var/k8s-yaml/fluentd/configmap.yaml kubectl apply -f /var/k8s-yaml/fluentd/daemonset.yaml |

Logstash 精细化数据加工与中转 {#Logstash-精细化数据加工与中转}

虽然数据从 Kafka 到 Elasticsearch 的方式多种多样,比如可以使用 Kafka Connect Elasticsearch Connector 来实现,我们这里还是采用更加流行的 Logstash 方案,上面我们已经将日志从 Fluentd 采集输出到 Kafka 中去了,接下来我们使用 Logstash 来连接 Kafka 与 Elasticsearch 间的日志数据。

|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | --- apiVersion: storage.k8s.io/v1 kind: StorageClass # 创建存储动态供给 metadata: name: local-storage provisioner: kubernetes.io/no-provisioner volumeBindingMode: WaitForFirstConsumer --- apiVersion: v1 kind: PersistentVolume # 创建pv metadata: name: logstash-local labels: pv-type: logstash-local-pv spec: accessModes: - ReadWriteOnce capacity: storage: 5Gi storageClassName: local-storage local: path: /data/nfs-volume/logstash nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/hostname operator: In values: - k8s-node01.local --- apiVersion: logstash.k8s.elastic.co/v1alpha1 kind: Logstash metadata: name: logstash namespace: logging spec: version: 8.15 .1 count: 1 elasticsearchRefs: - clusterName: k8s name: elasticsearch namespace: logging volumeClaimTemplates: - metadata: name : logstash-data spec: storageClassName: local-storage accessModes: - ReadWriteOnce selector: matchLabels: pv-type: logstash-local-pv resources: requests: storage: 2G pipelines: - pipeline.id: main config.string: | input { kafka { bootstrap_servers => "10.1.1.10:9092" consumer_threads => 4 group_id => "test_log" client_id => "test_log" # 注意: 多台logstash实例消费同一个topics时; client_id需要指定不同的名字 topics_pattern => "messages" # 要与fluentd中配置的topic一直 } } filter { json { source => "message" } } output { elasticsearch { hosts => [ "${ELASTICSEARCH_HOSTS}" ] user => "${ELASTICSEARCH_USER}" password => "${ELASTICSEARCH_PASSWORD}" ssl_verification_mode => 'none' # 索引是根据k8s的namespace区分的 index => "k8s-logstash-%{[kubernetes][namespace_name]}-%{+YYYY.MM.dd}" } # 日志输出的标准输出显示 stdout { codec => rubydebug } } podTemplate: spec: nodeSelector: logstash: "true" containers: - name: logstash env: - name: ELASTICSEARCH_HOSTS value: "https://elasticsearch-es-http.logging:9200" - name: ELASTICSEARCH_SSL_AUTHORITY value: "false" - name: ELASTICSEARCH_USER valueFrom: secretKeyRef: key: username name: elastic-auth - name: ELASTICSEARCH_PASSWORD valueFrom: secretKeyRef: key: password name: elastic-auth - name: LS_JAVA_OPTS value: "-Xmx512m -Xms512m" resources: requests: memory: 1Gi cpu: "500m" limits: memory: 1Gi cpu: "700m" |

安装 Kibana {#安装-Kibana}

参考这个安装: Kubernetes集群监控-使用EL(Fluentd)K实现日志监控和分析

浏览器访问 https://kibana.od.com/ 同样也是用户elastic 密码:admin123

验证EL (Fluentd) K集群 {#验证EL-Fluentd-K集群}

下面我们部署一个简单的测试应用, 新建 counter.yaml 文件,文件内容如下:

|------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | apiVersion: v1 kind: Pod metadata: name: counter labels: logging: 'true' # 一定要具有该标签才会被采集 spec: containers: - name: count image: busybox args: [ /bin/sh , -c , 'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done' , ] |

该 Pod 只是简单将日志信息打印到 stdout ,所以正常来说 Fluentd 会收集到这个日志数据,在 Kibana 中也就可以找到对应的日志数据了。回到 Kibana Dashboard 页面,点击左侧最下面的 Management -> Stack Management ,进入管理页面,点击左侧 数据 下面的 索引管理 就会发现索引数据:

kibana日志索引

Discover

赞(1)
未经允许不得转载:工具盒子 » Kubernetes集群监控-Kafka对接EL(Fluentd)K构建超强日志体系