51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

详解 Flink 在 K8S 中的部署与实战操作

一、概述

Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。

Flink 官网:

https://flink.apache.org/

不同版本的文档:

https://nightlies.apache.org/flink

flink on k8s 官方文档:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/

也可以参考我之前的文章:大数据 Hadoop 之 实时计算流计算引擎 Flink 环境部署:

https://www.cnblogs.com/liugp/p/16222693.html

GitHub 地址:

https://github.com/apache/flink/tree/release-1.14.6/

二、Flink 运行模式

官方文档:

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/overview/

Flink on yarn 有三种运行模式:

  • yarn-session 模式(Seesion Mode)
  • yarn-cluster 模式(Per-Job Mode)
  • Application 模式(Application Mode)

【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在 FLINK-26000 中。

三、Flink on k8s 实战操作

1)flink 下载

下载地址:

https://flink.apache.org/downloads.html

wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

2)构建基础镜像

docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12

3)session 模式

Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个 Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署
  • TaskManagers池的部署
  • 暴露JobManager 的 REST 和 UI 端口的服务

1、Native Kubernetes 模式

参数配置:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace

【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8

开始构建镜像

docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache

上传镜像
====


`docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
`

【2】创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

【3】创建 flink 集群

./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster  \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort

【4】提交任务

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar

    #   参数配置



`./examples/streaming/WordCount.jar
-Dkubernetes.taskmanager.cpu=2000m `
`
-Dexternal-resource.limits.kubernetes.cpu=4000m `
`
-Dexternal-resource.limits.kubernetes.memory=10Gi `
`
-Dexternal-resource.requests.kubernetes.cpu=2000m `
`
-Dexternal-resource.requests.kubernetes.memory=8Gi `
`
-Dkubernetes.taskmanager.cpu=2000m `
`
`

【温馨提示】注意 jdk 版本,目前 jdk8 是正常的。

【5】查看

kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-1

【6】删除 flink 集群

kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force

2、Standalone 模式

【1】构建镜像

默认用户是 flink 用户,这里我换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。

启动脚本 docker-entrypoint.sh

#!/usr/bin/env bash

###############################################################################


Licensed to the Apache Software Foundation (ASF) under one
==========================================================



or more contributor license agreements. See the NOTICE file
===========================================================



distributed with this work for additional information
=====================================================



regarding copyright ownership. The ASF licenses this file
=========================================================



to you under the Apache License, Version 2.0 (the
=================================================



"License"); you may not use this file except in compliance
==========================================================



with the License. You may obtain a copy of the License at
=========================================================




http://www.apache.org/licenses/LICENSE-2.0
==========================================




Unless required by applicable law or agreed to in writing, software
===================================================================



distributed under the License is distributed on an "AS IS" BASIS,
=================================================================



WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
========================================================================



See the License for the specific language governing permissions and
===================================================================



limitations under the License.
==============================



###############################################################################


COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"


If unspecified, the hostname of the container is taken as the JobManager address
================================================================================



JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"


drop_privs_cmd() {
if \[ $(id -u) != 0 \]; then
# Don't need to drop privs if EUID != 0
return
elif \[ -x /sbin/su-exec \]; then
# Alpine
echo su-exec admin
else
# Others
echo gosu admin
fi
}


copy_plugins_if_required() {
if \[ -z "$ENABLE_BUILT_IN_PLUGINS" \]; then
return 0
fi


echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" \| tr ';' ' '); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}


    mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
    if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
      echo "Plugin ${target_plugin} does not exist. Exiting."
      exit 1
    else
      ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
      echo "Successfully enabled ${target_plugin}"
    fi




done
}


set_config_option() {
local option=$1
local value=$2


escape periods for usage in regular expressions
===============================================



local escaped_option=$(echo ${option} \| sed -e "s/./\\./g")


either override an existing entry, or append a new one
======================================================



if grep -E "\^${escaped_option}:." "${CONF_FILE}" \> /dev/null; then
sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}"
else
echo "${option}: ${value}" \>\> "${CONF_FILE}"
fi
}


prepare_configuration() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125


    if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
    fi

    if [ -n "${FLINK_PROPERTIES}" ]; then
        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
    fi
    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"




}


maybe_enable_jemalloc() {
if \[ "${DISABLE_JEMALLOC:-false}" == "false" \]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if \[ -f "$JEMALLOC_PATH" \]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif \[ -f "$JEMALLOC_FALLBACK" \]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if \[ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" \]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
fi
fi
}


maybe_enable_jemalloc


copy_plugins_if_required


prepare_configuration


args=("$@")
if \[ "$1" = "help" \]; then
printf "Usage: $(basename "$0") (jobmanager\|${COMMAND_STANDALONE}\|taskmanager\|${COMMAND_HISTORY_SERVER})\\n"
printf "    Or $(basename "$0") help\\n\\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\\n"
exit 0
elif \[ "$1" = "jobmanager" \]; then
args=("${args\[@\]:1}")


    echo "Starting Job Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"




elif \[ "$1" = ${COMMAND_STANDALONE} \]; then
args=("${args\[@\]:1}")


    echo "Starting Job Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"




elif \[ "$1" = ${COMMAND_HISTORY_SERVER} \]; then
args=("${args\[@\]:1}")


    echo "Starting History Server"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"




elif \[ "$1" = "taskmanager" \]; then
args=("${args\[@\]:1}")


    echo "Starting Task Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"




fi


args=("${args\[@\]}")


Running command in pass-through mode
====================================


`exec $(drop_privs_cmd) "${args[@]}"
`

编排 Dockerfile

FROM myharbor.com/bigdata/centos:7.9.2009

USER root


安装常用工具
======



RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof


设置时区,默认是UTC时区
=============



RUN rm -f /etc/localtime \&\& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \&\& echo "Asia/Shanghai" \> /etc/timezone


RUN mkdir -p /opt/apache


ADD jdk-8u212-linux-x64.tar.gz /opt/apache/


ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/


ENV FLINK_HOME /opt/apache/flink-1.14.6
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH


创建用户应用jar目录
===========



RUN mkdir $FLINK_HOME/usrlib/


#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.sh


RUN groupadd --system --gid=9999 admin \&\& useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin


RUN chown -R admin:admin /opt/apache


#设置的工作目录
WORKDIR $FLINK_HOME


对外暴露端口
======



EXPOSE 6123 8081


执行脚本,构建镜像时不执行,运行实例才会执行
======================


`ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]`

【2】创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排 yaml 文件
  • flink-configuration-configmap.yaml

    apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender

      # Uncomment this if you want to _only_ change Flink's logging
      #logger.flink.name = org.apache.flink
      #logger.flink.level = INFO
    
      # The following lines keep the log level of common libraries/connectors on
      # log level INFO. The root logger does not override this. You have to manually
      # change the log levels here.
      logger.akka.name = akka
      logger.akka.level = INFO
      logger.kafka.name= org.apache.kafka
      logger.kafka.level = INFO
      logger.hadoop.name = org.apache.hadoop
      logger.hadoop.level = INFO
      logger.zookeeper.name = org.apache.zookeeper
      logger.zookeeper.level = INFO
    
      # Log all infos to the console
      appender.console.name = ConsoleAppender
      appender.console.type = CONSOLE
      appender.console.layout.type = PatternLayout
      appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
      # Log all infos in the given rolling file
      appender.rolling.name = RollingFileAppender
      appender.rolling.type = RollingFile
      appender.rolling.append = false
      appender.rolling.fileName = ${sys:log.file}
      appender.rolling.filePattern = ${sys:log.file}.%i
      appender.rolling.layout.type = PatternLayout
      appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
      appender.rolling.policies.type = Policies
      appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
      appender.rolling.policies.size.size=100MB
      appender.rolling.strategy.type = DefaultRolloverStrategy
      appender.rolling.strategy.max = 10
    
      # Suppress the irrelevant (wrong) warnings from the Netty channel handler
      logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
      logger.netty.level = OFF</code></pre>
    

    jobmanager-service.yaml可选服务,仅非 HA 模式需要。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager
      spec:
        type: ClusterIP
        ports:
        - name: rpc
          port: 6123
        - name: blob-server
          port: 6124
        - name: webui
          port: 8081
        selector:
          app: flink
          component: jobmanager
    

    jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager-rest
      spec:
        type: NodePort
        ports:
        - name: rest
          port: 8081
          targetPort: 8081
          nodePort: 30081
        selector:
          app: flink
          component: jobmanager
    

    taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-taskmanager-query-state
      spec:
        type: NodePort
        ports:
        - name: query-state
          port: 6125
          targetPort: 6125
          nodePort: 30025
        selector:
          app: flink
          component: taskmanager
    

    以上几个配置文件是公共的

    • jobmanager-session-deployment-non-ha.yaml

      apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user flink from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties

    taskmanager-session-deployment.yaml

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-taskmanager
      spec:
        replicas: 2
        selector:
          matchLabels:
            app: flink
            component: taskmanager
        template:
          metadata:
            labels:
              app: flink
              component: taskmanager
          spec:
            containers:
            - name: taskmanager
              image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
              args: ["taskmanager"]
              ports:
              - containerPort: 6122
                name: rpc
              - containerPort: 6125
                name: query-state
              livenessProbe:
                tcpSocket:
                  port: 6122
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:
              - name: flink-config-volume
                mountPath: /opt/apache/flink-1.14.6/conf/
              securityContext:
                runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
            volumes:
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: flink-conf.yaml
                  path: flink-conf.yaml
                - key: log4j-console.properties
                  path: log4j-console.properties
    

    【4】创建 flink 集群

      kubectl create ns flink
      # Configuration and service definition
      kubectl create -f flink-configuration-configmap.yaml -n flink
    
      service
      =======
    
    
    
      kubectl create -f jobmanager-service.yaml -n flink
      kubectl create -f jobmanager-rest-service.yaml -n flink
      kubectl create -f taskmanager-query-state-service.yaml -n flink
    
    
      Create the deployments for the cluster
      ======================================
    
    
      `kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
      kubectl create -f taskmanager-session-deployment.yaml -n flink`
    

    镜像逆向解析 dockerfile

      alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
      whaler flink:1.14.6-scala_2.12
    

    查看

      kubectl get pods,svc -n flink -owide
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/31d9c7303e024c3d8e93db09700b3e47.png.jpg)
    

    Web UI 地址:

    http://192.168.182.110:30081/#/overview

    【5】提交任务
      ./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
    
    
    
    
    
      kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/44f72918e5c440a2a1fc94dff56b0a5e.png.jpg)
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/b5a680b3c7b54843b111764026e50534.png.jpg)
    

    【6】删除 flink 集群

      kubectl delete -f jobmanager-service.yaml -n flink
      kubectl delete -f flink-configuration-configmap.yaml -n flink
      kubectl delete -f taskmanager-session-deployment.yaml -n flink
      kubectl delete -f jobmanager-session-deployment.yaml -n flink
      kubectl delete ns flink --force
    
    【7】访问 flink web

    端口就是jobmanager-rest-service.yaml文件中的 NodePort

    http://192.168.182.110:30081/#/overview

    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/1b6d5b88d27142cd80d6f348826cdcf5.png.jpg)
    

    4)application 模式(推荐)

    Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:

    • 运行JobManager的应用程序

    • TaskManagers池的部署

    • 暴露JobManager 的 REST 和 UI 端口的服务

    1、Native Kubernetes 模式(常用)

    【1】构建镜像 Dockerfile
      FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
      RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
      RUN export LANG=zh_CN.UTF-8
      RUN mkdir -p $FLINK_HOME/usrlib
      COPY  TopSpeedWindowing.jar $FLINK_HOME/usrlib/
    

    开始构建镜像

      docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache
    
      上传镜像
      ====
    
    
    
      docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
    
    
      删除镜像
      ====
    
    
      `docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
      crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12`
    

    【2】创建命名空间和 serviceacount

      # 创建namespace
      kubectl create ns flink
      # 创建serviceaccount
      kubectl create serviceaccount flink-service-account -n flink
      # 用户授权
      kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
    

    【3】创建 flink 集群并提交任务

      ./bin/flink run-application \
          --target kubernetes-application \
          -Dkubernetes.cluster-id=my-first-application-cluster  \
       -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \
       -Dkubernetes.jobmanager.replicas=1 \
       -Dkubernetes.namespace=flink \
       -Dkubernetes.jobmanager.service-account=flink-service-account \
       -Dexternal-resource.limits.kubernetes.cpu=2000m \
       -Dexternal-resource.limits.kubernetes.memory=2Gi \
       -Dexternal-resource.requests.kubernetes.cpu=1000m \
       -Dexternal-resource.requests.kubernetes.memory=1Gi \
       -Dkubernetes.rest-service.exposed.type=NodePort \
       local:///opt/flink/usrlib/TopSpeedWindowing.jar
    

    【注意】 local是应用模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

    查看

      kubectl get pods pods,svc -n flink
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/2e681c2c9aa148d39b580c4393a19dd3.png.jpg)
    
    
      kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/87e812206ac94b029c0ff41cae820b6b.png.jpg)
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/2e9ce711781f46ab91793e604f650b96.png.jpg)
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/30877ab2fdab4edb9a3e0c4fe2773a6b.png.jpg)
    

    【4】删除 flink 集群

      kubectl delete deployment/my-first-application-cluster -n flink
      kubectl delete ns flink --force
    

    2、Standalone 模式

    【1】构建镜像 Dockerfile

    启动脚本 docker-entrypoint.sh

      #!/usr/bin/env bash
    
      ###############################################################################
    
    
      Licensed to the Apache Software Foundation (ASF) under one
      ==========================================================
    
    
    
      or more contributor license agreements. See the NOTICE file
      ===========================================================
    
    
    
      distributed with this work for additional information
      =====================================================
    
    
    
      regarding copyright ownership. The ASF licenses this file
      =========================================================
    
    
    
      to you under the Apache License, Version 2.0 (the
      =================================================
    
    
    
      "License"); you may not use this file except in compliance
      ==========================================================
    
    
    
      with the License. You may obtain a copy of the License at
      =========================================================
    
    
    
    
      http://www.apache.org/licenses/LICENSE-2.0
      ==========================================
    
    
    
    
      Unless required by applicable law or agreed to in writing, software
      ===================================================================
    
    
    
      distributed under the License is distributed on an "AS IS" BASIS,
      =================================================================
    
    
    
      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      ========================================================================
    
    
    
      See the License for the specific language governing permissions and
      ===================================================================
    
    
    
      limitations under the License.
      ==============================
    
    
    
      ###############################################################################
    
    
      COMMAND_STANDALONE="standalone-job"
      COMMAND_HISTORY_SERVER="history-server"
    
    
      If unspecified, the hostname of the container is taken as the JobManager address
      ================================================================================
    
    
    
      JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
      CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
    
    
      drop_privs_cmd() {
      if \[ $(id -u) != 0 \]; then
      # Don't need to drop privs if EUID != 0
      return
      elif \[ -x /sbin/su-exec \]; then
      # Alpine
      echo su-exec admin
      else
      # Others
      echo gosu admin
      fi
      }
    
    
      copy_plugins_if_required() {
      if \[ -z "$ENABLE_BUILT_IN_PLUGINS" \]; then
      return 0
      fi
    
    
      echo "Enabling required built-in plugins"
      for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" \| tr ';' ' '); do
      echo "Linking ${target_plugin} to plugin directory"
      plugin_name=${target_plugin%.jar}
    
    
          mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
          if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
            echo "Plugin ${target_plugin} does not exist. Exiting."
            exit 1
          else
            ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
            echo "Successfully enabled ${target_plugin}"
          fi
    
    
    
    
      done
      }
    
    
      set_config_option() {
      local option=$1
      local value=$2
    
    
      escape periods for usage in regular expressions
      ===============================================
    
    
    
      local escaped_option=$(echo ${option} \| sed -e "s/./\\./g")
    
    
      either override an existing entry, or append a new one
      ======================================================
    
    
    
      if grep -E "\^${escaped_option}:." "${CONF_FILE}" \> /dev/null; then
      sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}"
      else
      echo "${option}: ${value}" \>\> "${CONF_FILE}"
      fi
      }
    
    
      prepare_configuration() {
      set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
      set_config_option blob.server.port 6124
      set_config_option query.server.port 6125
    
    
          if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
              set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
          fi
    
          if [ -n "${FLINK_PROPERTIES}" ]; then
              echo "${FLINK_PROPERTIES}" &gt;&gt; "${CONF_FILE}"
          fi
          envsubst &lt; "${CONF_FILE}" &gt; "${CONF_FILE}.tmp" &amp;&amp; mv "${CONF_FILE}.tmp" "${CONF_FILE}"
    
    
    
    
      }
    
    
      maybe_enable_jemalloc() {
      if \[ "${DISABLE_JEMALLOC:-false}" == "false" \]; then
      JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
      JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
      if \[ -f "$JEMALLOC_PATH" \]; then
      export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
      elif \[ -f "$JEMALLOC_FALLBACK" \]; then
      export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
      else
      if \[ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" \]; then
      MSG_PATH=$JEMALLOC_PATH
      else
      MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
      fi
      echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
      fi
      fi
      }
    
    
      maybe_enable_jemalloc
    
    
      copy_plugins_if_required
    
    
      prepare_configuration
    
    
      args=("$@")
      if \[ "$1" = "help" \]; then
      printf "Usage: $(basename "$0") (jobmanager\|${COMMAND_STANDALONE}\|taskmanager\|${COMMAND_HISTORY_SERVER})\\n"
      printf "    Or $(basename "$0") help\\n\\n"
      printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\\n"
      exit 0
      elif \[ "$1" = "jobmanager" \]; then
      args=("${args\[@\]:1}")
    
    
          echo "Starting Job Manager"
    
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
    
    
    
    
      elif \[ "$1" = ${COMMAND_STANDALONE} \]; then
      args=("${args\[@\]:1}")
    
    
          echo "Starting Job Manager"
    
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
    
    
    
    
      elif \[ "$1" = ${COMMAND_HISTORY_SERVER} \]; then
      args=("${args\[@\]:1}")
    
    
          echo "Starting History Server"
    
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
    
    
    
    
      elif \[ "$1" = "taskmanager" \]; then
      args=("${args\[@\]:1}")
    
    
          echo "Starting Task Manager"
    
          exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
    
    
    
    
      fi
    
    
      args=("${args\[@\]}")
    
    
      Running command in pass-through mode
      ====================================
    
    
      `exec $(drop_privs_cmd) "${args[@]}"`
    

    编排Dockerfile

      FROM myharbor.com/bigdata/centos:7.9.2009
    
      USER root
    
    
      安装常用工具
      ======
    
    
    
      RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
    
    
      设置时区,默认是UTC时区
      =============
    
    
    
      RUN rm -f /etc/localtime \&\& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \&\& echo "Asia/Shanghai" \> /etc/timezone
    
    
      RUN mkdir -p /opt/apache
    
    
      ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
    
    
      ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/
    
    
      ENV FLINK_HOME /opt/apache/flink-1.14.6
      ENV JAVA_HOME /opt/apache/jdk1.8.0_212
      ENV PATH $JAVA_HOME/bin:$PATH
    
    
      创建用户应用jar目录
      ===========
    
    
    
      RUN mkdir $FLINK_HOME/usrlib/
    
    
      #RUN mkdir home
      COPY docker-entrypoint.sh /opt/apache/
    
    
      RUN groupadd --system --gid=9999 admin \&\& useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
    
    
      RUN chown -R admin:admin /opt/apache
      RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh
    
    
      #设置的工作目录
      WORKDIR $FLINK_HOME
    
    
      对外暴露端口
      ======
    
    
    
      EXPOSE 6123 8081
    
    
      执行脚本,构建镜像时不执行,运行实例才会执行
      ======================
    
    
      `ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
      CMD ["help"]`
    

      docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
    
      # 上传镜像
      docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
    
      # 删除镜像
      docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
    

    【2】创建命名空间和 serviceacount

      # 创建namespace
      kubectl create ns flink
      # 创建serviceaccount
      kubectl create serviceaccount flink-service-account -n flink
      # 用户授权
      kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
    
    【3】编排 yaml 文件

    flink-configuration-configmap.yaml

      apiVersion: v1
      kind: ConfigMap
      metadata:
        name: flink-config
        labels:
          app: flink
      data:
        flink-conf.yaml: |+
          jobmanager.rpc.address: flink-jobmanager
          taskmanager.numberOfTaskSlots: 2
          blob.server.port: 6124
          jobmanager.rpc.port: 6123
          taskmanager.rpc.port: 6122
          queryable-state.proxy.ports: 6125
          jobmanager.memory.process.size: 3200m
          taskmanager.memory.process.size: 2728m
          taskmanager.memory.flink.size: 2280m
          parallelism.default: 2
        log4j-console.properties: |+
          # This affects logging for both user code and Flink
          rootLogger.level = INFO
          rootLogger.appenderRef.console.ref = ConsoleAppender
          rootLogger.appenderRef.rolling.ref = RollingFileAppender
    
          # Uncomment this if you want to _only_ change Flink's logging
          #logger.flink.name = org.apache.flink
          #logger.flink.level = INFO
    
          # The following lines keep the log level of common libraries/connectors on
          # log level INFO. The root logger does not override this. You have to manually
          # change the log levels here.
          logger.akka.name = akka
          logger.akka.level = INFO
          logger.kafka.name= org.apache.kafka
          logger.kafka.level = INFO
          logger.hadoop.name = org.apache.hadoop
          logger.hadoop.level = INFO
          logger.zookeeper.name = org.apache.zookeeper
          logger.zookeeper.level = INFO
    
          # Log all infos to the console
          appender.console.name = ConsoleAppender
          appender.console.type = CONSOLE
          appender.console.layout.type = PatternLayout
          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
          # Log all infos in the given rolling file
          appender.rolling.name = RollingFileAppender
          appender.rolling.type = RollingFile
          appender.rolling.append = false
          appender.rolling.fileName = ${sys:log.file}
          appender.rolling.filePattern = ${sys:log.file}.%i
          appender.rolling.layout.type = PatternLayout
          appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          appender.rolling.policies.type = Policies
          appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
          appender.rolling.policies.size.size=100MB
          appender.rolling.strategy.type = DefaultRolloverStrategy
          appender.rolling.strategy.max = 10
    
          # Suppress the irrelevant (wrong) warnings from the Netty channel handler
          logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
          logger.netty.level = OFF</code></pre>
    
    
    
    
      `jobmanager-service.yaml`可选服务,仅非 HA 模式需要。
    
    
    
          apiVersion: v1
          kind: Service
          metadata:
            name: flink-jobmanager
          spec:
            type: ClusterIP
            ports:
            - name: rpc
              port: 6123
            - name: blob-server
              port: 6124
            - name: webui
              port: 8081
            selector:
              app: flink
              component: jobmanager
    
    
    
    
      `jobmanager-rest-service.yaml` 可选服务,将 jobmanager `rest`端口公开为公共 Kubernetes 节点的端口。
    
    
          apiVersion: v1
          kind: Service
          metadata:
            name: flink-jobmanager-rest
          spec:
            type: NodePort
            ports:
            - name: rest
              port: 8081
              targetPort: 8081
              nodePort: 30081
            selector:
              app: flink
              component: jobmanager
    
    
    
    
      `taskmanager-query-state-service.yaml` 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
    
    
    
          apiVersion: v1
          kind: Service
          metadata:
            name: flink-taskmanager-query-state
          spec:
            type: NodePort
            ports:
            - name: query-state
              port: 6125
              targetPort: 6125
              nodePort: 30025
            selector:
              app: flink
              component: taskmanager
    
    
    
    
      `jobmanager-application-non-ha.yaml` ,非高可用
    
    
    
          apiVersion: batch/v1
          kind: Job
          metadata:
            name: flink-jobmanager
          spec:
            template:
              metadata:
                labels:
                  app: flink
                  component: jobmanager
              spec:
                restartPolicy: OnFailure
                containers:
                  - name: jobmanager
                    image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
                    env:
                    args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]
                    ports:
                      - containerPort: 6123
                        name: rpc
                      - containerPort: 6124
                        name: blob-server
                      - containerPort: 8081
                        name: webui
                    livenessProbe:
                      tcpSocket:
                        port: 6123
                      initialDelaySeconds: 30
                      periodSeconds: 60
                    volumeMounts:
                      - name: flink-config-volume
                        mountPath: /opt/apache/flink-1.14.6/conf
                      - name: job-artifacts-volume
                        mountPath: /opt/apache/flink-1.14.6/usrlib
                    securityContext:
                      runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
                volumes:
                  - name: flink-config-volume
                    configMap:
                      name: flink-config
                      items:
                        - key: flink-conf.yaml
                          path: flink-conf.yaml
                        - key: log4j-console.properties
                          path: log4j-console.properties
                  - name: job-artifacts-volume
                    hostPath:
                      path: /mnt/nfsdata/flink/application/job-artifacts
    
    
    
    
      >   【温馨提示】注意这里的挂载`/mnt/bigdata/flink/usrlib`,最好这里使用共享目录。
      >
      >  
    
    
      `taskmanager-job-deployment.yaml`
    
    
          apiVersion: apps/v1
          kind: Deployment
          metadata:
            name: flink-taskmanager
          spec:
            replicas: 2
            selector:
              matchLabels:
                app: flink
                component: taskmanager
            template:
              metadata:
                labels:
                  app: flink
                  component: taskmanager
              spec:
                containers:
                - name: taskmanager
                  image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
                  env:
                  args: ["taskmanager"]
                  ports:
                  - containerPort: 6122
                    name: rpc
                  - containerPort: 6125
                    name: query-state
                  livenessProbe:
                    tcpSocket:
                      port: 6122
                    initialDelaySeconds: 30
                    periodSeconds: 60
                  volumeMounts:
                  - name: flink-config-volume
                    mountPath: /opt/apache/flink-1.14.6/conf
                  - name: job-artifacts-volume
                    mountPath: /opt/apache/flink-1.14.6/usrlib
                  securityContext:
                    runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
                volumes:
                - name: flink-config-volume
                  configMap:
                    name: flink-config
                    items:
                    - key: flink-conf.yaml
                      path: flink-conf.yaml
                    - key: log4j-console.properties
                      path: log4j-console.properties
                - name: job-artifacts-volume
                  hostPath:
                    path: /mnt/nfsdata/flink/application/job-artifacts
    
    
    
    
      【4】创建 flink 集群并提交任务
    
    
          kubectl create ns flink
          # Configuration and service definition
          kubectl create -f flink-configuration-configmap.yaml -n flink
    
          service
          =======
    
    
    
          kubectl create -f jobmanager-service.yaml -n flink
          kubectl create -f jobmanager-rest-service.yaml -n flink
          kubectl create -f taskmanager-query-state-service.yaml -n flink
    
    
          Create the deployments for the cluster
          ======================================
    
    
          `kubectl create -f  jobmanager-application-non-ha.yaml -n flink
          kubectl create -f  taskmanager-job-deployment.yaml -n flink`
    
    
    
      <br />
    
    
    
    
      查看
    
    
          kubectl get pods,svc -n flink
    
    
    
    
        ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/eaec20fd1cfe4d04840b4db2096a3dda.png.jpg)
    
    
      ```
      【5】删除 flink 集群
    
      ```
    
    
    
          kubectl delete -f flink-configuration-configmap.yaml -n flink
          kubectl delete -f jobmanager-service.yaml -n flink
          kubectl delete -f jobmanager-rest-service.yaml -n flink
          kubectl delete -f taskmanager-query-state-service.yaml -n flink
          kubectl delete -f jobmanager-application-non-ha.yaml -n flink
          kubectl delete -f taskmanager-job-deployment.yaml -n flink
          `kubectl delete ns flink --force`
    
    
    
      <br />
    
    
    
    
      【6】查看
    
    
    
          kubectl get pods,svc -n flink
          kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash
    
    
    
    
        ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/838b0a5753f942659ad8cbcc3f3d49cb.png.jpg)
    
    
      Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享
    

赞(0)
未经允许不得转载:工具盒子 » 详解 Flink 在 K8S 中的部署与实战操作