51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

详解 Flink 在 K8S 中的部署与实战操作

一、概述

Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。

Flink 官网:

https://flink.apache.org/

不同版本的文档:

https://nightlies.apache.org/flink

flink on k8s 官方文档:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/

也可以参考我之前的文章:大数据 Hadoop 之 实时计算流计算引擎 Flink 环境部署:

https://www.cnblogs.com/liugp/p/16222693.html

GitHub 地址:

https://github.com/apache/flink/tree/release-1.14.6/

二、Flink 运行模式

官方文档:

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/overview/

Flink on yarn 有三种运行模式:

  • yarn-session 模式(Seesion Mode)
  • yarn-cluster 模式(Per-Job Mode)
  • Application 模式(Application Mode)

【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在 FLINK-26000 中。

三、Flink on k8s 实战操作

1)flink 下载

下载地址:

https://flink.apache.org/downloads.html

wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

2)构建基础镜像

docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12

3)session 模式

Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个 Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:

  • 运行JobManager的部署
  • TaskManagers池的部署
  • 暴露JobManager 的 REST 和 UI 端口的服务

1、Native Kubernetes 模式

参数配置:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace

【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8

开始构建镜像

docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache

上传镜像

docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12


【2】创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account

【3】创建 flink 集群

./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster  \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort

【4】提交任务

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar
#   参数配置

./examples/streaming/WordCount.jar -Dkubernetes.taskmanager.cpu=2000m -Dexternal-resource.limits.kubernetes.cpu=4000m -Dexternal-resource.limits.kubernetes.memory=10Gi -Dexternal-resource.requests.kubernetes.cpu=2000m -Dexternal-resource.requests.kubernetes.memory=8Gi -Dkubernetes.taskmanager.cpu=2000m


【温馨提示】注意 jdk 版本,目前 jdk8 是正常的。

【5】查看

kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-1

【6】删除 flink 集群

kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force

2、Standalone 模式

【1】构建镜像

默认用户是 flink 用户,这里我换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。

启动脚本 docker-entrypoint.sh

#!/usr/bin/env bash

###############################################################################

Licensed to the Apache Software Foundation (ASF) under one

or more contributor license agreements. See the NOTICE file

distributed with this work for additional information

regarding copyright ownership. The ASF licenses this file

to you under the Apache License, Version 2.0 (the

"License"); you may not use this file except in compliance

with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

###############################################################################

COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"

If unspecified, the hostname of the container is taken as the JobManager address

JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() { if [ $(id -u) != 0 ]; then

Don't need to drop privs if EUID != 0

return elif [ -x /sbin/su-exec ]; then

Alpine

echo su-exec admin else

Others

echo gosu admin fi }

copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi

echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar}

mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
  echo "Plugin ${target_plugin} does not exist. Exiting."
  exit 1
else
  ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
  echo "Successfully enabled ${target_plugin}"
fi

done }

set_config_option() { local option=$1 local value=$2

escape periods for usage in regular expressions

local escaped_option=$(echo ${option} | sed -e "s/./\./g")

either override an existing entry, or append a new one

if grep -E "^${escaped_option}:." "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi }

prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125

if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
    set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi

if [ -n "${FLINK_PROPERTIES}" ]; then
    echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"

}

maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi }

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"

elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}")

echo "Starting History Server"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"

elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}")

echo "Starting Task Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"

fi

args=("${args[@]}")

Running command in pass-through mode

exec $(drop_privs_cmd) "${args[@]}"


编排 Dockerfile

FROM myharbor.com/bigdata/centos:7.9.2009

USER root

安装常用工具

RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

设置时区,默认是UTC时区

RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN mkdir -p /opt/apache

ADD jdk-8u212-linux-x64.tar.gz /opt/apache/

ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/

ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH

创建用户应用jar目录

RUN mkdir $FLINK_HOME/usrlib/

#RUN mkdir home COPY docker-entrypoint.sh /opt/apache/ RUN chmod +x /opt/apache/docker-entrypoint.sh

RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN chown -R admin:admin /opt/apache

#设置的工作目录 WORKDIR $FLINK_HOME

对外暴露端口

EXPOSE 6123 8081

执行脚本,构建镜像时不执行,运行实例才会执行

ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]


【2】创建命名空间和 serviceaccount

# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排 yaml 文件
  • flink-configuration-configmap.yaml

    apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender

      # Uncomment this if you want to _only_ change Flink's logging
      #logger.flink.name = org.apache.flink
      #logger.flink.level = INFO
    

    The following lines keep the log level of common libraries/connectors on

    log level INFO. The root logger does not override this. You have to manually

    change the log levels here.

    logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO

    Log all infos to the console

    appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    Log all infos in the given rolling file

    appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10

    Suppress the irrelevant (wrong) warnings from the Netty channel handler

    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF</code></pre>

    jobmanager-service.yaml可选服务,仅非 HA 模式需要。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager
      spec:
        type: ClusterIP
        ports:
        - name: rpc
          port: 6123
        - name: blob-server
          port: 6124
        - name: webui
          port: 8081
        selector:
          app: flink
          component: jobmanager
    

    jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager-rest
      spec:
        type: NodePort
        ports:
        - name: rest
          port: 8081
          targetPort: 8081
          nodePort: 30081
        selector:
          app: flink
          component: jobmanager
    

    taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-taskmanager-query-state
      spec:
        type: NodePort
        ports:
        - name: query-state
          port: 6125
          targetPort: 6125
          nodePort: 30025
        selector:
          app: flink
          component: taskmanager
    

    以上几个配置文件是公共的

    • jobmanager-session-deployment-non-ha.yaml

      apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user flink from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties

    taskmanager-session-deployment.yaml

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-taskmanager
      spec:
        replicas: 2
        selector:
          matchLabels:
            app: flink
            component: taskmanager
        template:
          metadata:
            labels:
              app: flink
              component: taskmanager
          spec:
            containers:
            - name: taskmanager
              image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
              args: ["taskmanager"]
              ports:
              - containerPort: 6122
                name: rpc
              - containerPort: 6125
                name: query-state
              livenessProbe:
                tcpSocket:
                  port: 6122
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:
              - name: flink-config-volume
                mountPath: /opt/apache/flink-1.14.6/conf/
              securityContext:
                runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
            volumes:
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: flink-conf.yaml
                  path: flink-conf.yaml
                - key: log4j-console.properties
                  path: log4j-console.properties
    

    【4】创建 flink 集群

      kubectl create ns flink
      # Configuration and service definition
      kubectl create -f flink-configuration-configmap.yaml -n flink
    

    service

    kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink

    Create the deployments for the cluster

    kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink kubectl create -f taskmanager-session-deployment.yaml -n flink


    镜像逆向解析 dockerfile

      alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
      whaler flink:1.14.6-scala_2.12
    

    查看

      kubectl get pods,svc -n flink -owide
    

    Web UI 地址:

    http://192.168.182.110:30081/#/overview

    【5】提交任务
      ./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
    

    kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink

    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/b5a680b3c7b54843b111764026e50534.png.jpg)
    

    【6】删除 flink 集群

      kubectl delete -f jobmanager-service.yaml -n flink
      kubectl delete -f flink-configuration-configmap.yaml -n flink
      kubectl delete -f taskmanager-session-deployment.yaml -n flink
      kubectl delete -f jobmanager-session-deployment.yaml -n flink
      kubectl delete ns flink --force
    
    【7】访问 flink web

    端口就是jobmanager-rest-service.yaml文件中的 NodePort

    http://192.168.182.110:30081/#/overview

    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/1b6d5b88d27142cd80d6f348826cdcf5.png.jpg)
    

    4)application 模式(推荐)

    Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:

    • 运行JobManager的应用程序

    • TaskManagers池的部署

    • 暴露JobManager 的 REST 和 UI 端口的服务

    1、Native Kubernetes 模式(常用)

    【1】构建镜像 Dockerfile
      FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
      RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
      RUN export LANG=zh_CN.UTF-8
      RUN mkdir -p $FLINK_HOME/usrlib
      COPY  TopSpeedWindowing.jar $FLINK_HOME/usrlib/
    

    开始构建镜像

      docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache
    

    上传镜像

    docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12

    删除镜像

    docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12


    【2】创建命名空间和 serviceacount

      # 创建namespace
      kubectl create ns flink
      # 创建serviceaccount
      kubectl create serviceaccount flink-service-account -n flink
      # 用户授权
      kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
    

    【3】创建 flink 集群并提交任务

      ./bin/flink run-application \
          --target kubernetes-application \
          -Dkubernetes.cluster-id=my-first-application-cluster  \
       -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \
       -Dkubernetes.jobmanager.replicas=1 \
       -Dkubernetes.namespace=flink \
       -Dkubernetes.jobmanager.service-account=flink-service-account \
       -Dexternal-resource.limits.kubernetes.cpu=2000m \
       -Dexternal-resource.limits.kubernetes.memory=2Gi \
       -Dexternal-resource.requests.kubernetes.cpu=1000m \
       -Dexternal-resource.requests.kubernetes.memory=1Gi \
       -Dkubernetes.rest-service.exposed.type=NodePort \
       local:///opt/flink/usrlib/TopSpeedWindowing.jar
    

    【注意】 local是应用模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。

    查看

      kubectl get pods pods,svc -n flink
    

    kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink

    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/2e9ce711781f46ab91793e604f650b96.png.jpg)
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/30877ab2fdab4edb9a3e0c4fe2773a6b.png.jpg)
    

    【4】删除 flink 集群

      kubectl delete deployment/my-first-application-cluster -n flink
      kubectl delete ns flink --force
    

    2、Standalone 模式

    【1】构建镜像 Dockerfile

    启动脚本 docker-entrypoint.sh

      #!/usr/bin/env bash
    

    ###############################################################################

    Licensed to the Apache Software Foundation (ASF) under one

    or more contributor license agreements. See the NOTICE file

    distributed with this work for additional information

    regarding copyright ownership. The ASF licenses this file

    to you under the Apache License, Version 2.0 (the

    "License"); you may not use this file except in compliance

    with the License. You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software

    distributed under the License is distributed on an "AS IS" BASIS,

    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

    See the License for the specific language governing permissions and

    limitations under the License.

    ###############################################################################

    COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"

    If unspecified, the hostname of the container is taken as the JobManager address

    JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

    drop_privs_cmd() { if [ $(id -u) != 0 ]; then

    Don't need to drop privs if EUID != 0

    return elif [ -x /sbin/su-exec ]; then

    Alpine

    echo su-exec admin else

    Others

    echo gosu admin fi }

    copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi

    echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar}

      mkdir -p &quot;${FLINK_HOME}/plugins/${plugin_name}&quot;
      if [ ! -e &quot;${FLINK_HOME}/opt/${target_plugin}&quot; ]; then
        echo &quot;Plugin ${target_plugin} does not exist. Exiting.&quot;
        exit 1
      else
        ln -fs &quot;${FLINK_HOME}/opt/${target_plugin}&quot; &quot;${FLINK_HOME}/plugins/${plugin_name}&quot;
        echo &quot;Successfully enabled ${target_plugin}&quot;
      fi
    

    done }

    set_config_option() { local option=$1 local value=$2

    escape periods for usage in regular expressions

    local escaped_option=$(echo ${option} | sed -e "s/./\./g")

    either override an existing entry, or append a new one

    if grep -E "^${escaped_option}:." "${CONF_FILE}" &gt; /dev/null; then sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" &gt;&gt; "${CONF_FILE}" fi }

    prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125

      if [ -n &quot;${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}&quot; ]; then
          set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
      fi
    
      if [ -n &quot;${FLINK_PROPERTIES}&quot; ]; then
          echo &quot;${FLINK_PROPERTIES}&quot; &amp;gt;&amp;gt; &quot;${CONF_FILE}&quot;
      fi
      envsubst &amp;lt; &quot;${CONF_FILE}&quot; &amp;gt; &quot;${CONF_FILE}.tmp&quot; &amp;amp;&amp;amp; mv &quot;${CONF_FILE}.tmp&quot; &quot;${CONF_FILE}&quot;
    

    }

    maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi }

    maybe_enable_jemalloc

    copy_plugins_if_required

    prepare_configuration

    args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}")

      echo &quot;Starting Job Manager&quot;
    
      exec $(drop_privs_cmd) &quot;$FLINK_HOME/bin/jobmanager.sh&quot; start-foreground &quot;${args[@]}&quot;
    

    elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}")

      echo &quot;Starting Job Manager&quot;
    
      exec $(drop_privs_cmd) &quot;$FLINK_HOME/bin/standalone-job.sh&quot; start-foreground &quot;${args[@]}&quot;
    

    elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}")

      echo &quot;Starting History Server&quot;
    
      exec $(drop_privs_cmd) &quot;$FLINK_HOME/bin/historyserver.sh&quot; start-foreground &quot;${args[@]}&quot;
    

    elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}")

      echo &quot;Starting Task Manager&quot;
    
      exec $(drop_privs_cmd) &quot;$FLINK_HOME/bin/taskmanager.sh&quot; start-foreground &quot;${args[@]}&quot;
    

    fi

    args=("${args[@]}")

    Running command in pass-through mode

    exec $(drop_privs_cmd) &quot;${args[@]}&quot;


    编排Dockerfile

      FROM myharbor.com/bigdata/centos:7.9.2009
    

    USER root

    安装常用工具

    RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

    设置时区,默认是UTC时区

    RUN rm -f /etc/localtime &amp;&amp; ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime &amp;&amp; echo "Asia/Shanghai" &gt; /etc/timezone

    RUN mkdir -p /opt/apache

    ADD jdk-8u212-linux-x64.tar.gz /opt/apache/

    ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/

    ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH

    创建用户应用jar目录

    RUN mkdir $FLINK_HOME/usrlib/

    #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/

    RUN groupadd --system --gid=9999 admin &amp;&amp; useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

    RUN chown -R admin:admin /opt/apache RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh

    #设置的工作目录 WORKDIR $FLINK_HOME

    对外暴露端口

    EXPOSE 6123 8081

    执行脚本,构建镜像时不执行,运行实例才会执行

    ENTRYPOINT [&quot;/opt/apache/docker-entrypoint.sh&quot;] CMD [&quot;help&quot;]


      docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache
    
      # 上传镜像
      docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
    
      # 删除镜像
      docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
    

    【2】创建命名空间和 serviceacount

      # 创建namespace
      kubectl create ns flink
      # 创建serviceaccount
      kubectl create serviceaccount flink-service-account -n flink
      # 用户授权
      kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
    
    【3】编排 yaml 文件

    flink-configuration-configmap.yaml

      apiVersion: v1
      kind: ConfigMap
      metadata:
        name: flink-config
        labels:
          app: flink
      data:
        flink-conf.yaml: |+
          jobmanager.rpc.address: flink-jobmanager
          taskmanager.numberOfTaskSlots: 2
          blob.server.port: 6124
          jobmanager.rpc.port: 6123
          taskmanager.rpc.port: 6122
          queryable-state.proxy.ports: 6125
          jobmanager.memory.process.size: 3200m
          taskmanager.memory.process.size: 2728m
          taskmanager.memory.flink.size: 2280m
          parallelism.default: 2
        log4j-console.properties: |+
          # This affects logging for both user code and Flink
          rootLogger.level = INFO
          rootLogger.appenderRef.console.ref = ConsoleAppender
          rootLogger.appenderRef.rolling.ref = RollingFileAppender
    
      # Uncomment this if you want to _only_ change Flink's logging
      #logger.flink.name = org.apache.flink
      #logger.flink.level = INFO
    
      # The following lines keep the log level of common libraries/connectors on
      # log level INFO. The root logger does not override this. You have to manually
      # change the log levels here.
      logger.akka.name = akka
      logger.akka.level = INFO
      logger.kafka.name= org.apache.kafka
      logger.kafka.level = INFO
      logger.hadoop.name = org.apache.hadoop
      logger.hadoop.level = INFO
      logger.zookeeper.name = org.apache.zookeeper
      logger.zookeeper.level = INFO
    
      # Log all infos to the console
      appender.console.name = ConsoleAppender
      appender.console.type = CONSOLE
      appender.console.layout.type = PatternLayout
      appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
      # Log all infos in the given rolling file
      appender.rolling.name = RollingFileAppender
      appender.rolling.type = RollingFile
      appender.rolling.append = false
      appender.rolling.fileName = ${sys:log.file}
      appender.rolling.filePattern = ${sys:log.file}.%i
      appender.rolling.layout.type = PatternLayout
      appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
      appender.rolling.policies.type = Policies
      appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
      appender.rolling.policies.size.size=100MB
      appender.rolling.strategy.type = DefaultRolloverStrategy
      appender.rolling.strategy.max = 10
    
      # Suppress the irrelevant (wrong) warnings from the Netty channel handler
      logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
      logger.netty.level = OFF&lt;/code&gt;&lt;/pre&gt;
    

    jobmanager-service.yaml可选服务,仅非 HA 模式需要。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager
      spec:
        type: ClusterIP
        ports:
        - name: rpc
          port: 6123
        - name: blob-server
          port: 6124
        - name: webui
          port: 8081
        selector:
          app: flink
          component: jobmanager
    

    jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager-rest
      spec:
        type: NodePort
        ports:
        - name: rest
          port: 8081
          targetPort: 8081
          nodePort: 30081
        selector:
          app: flink
          component: jobmanager
    

    taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-taskmanager-query-state
      spec:
        type: NodePort
        ports:
        - name: query-state
          port: 6125
          targetPort: 6125
          nodePort: 30025
        selector:
          app: flink
          component: taskmanager
    

    jobmanager-application-non-ha.yaml ,非高可用

      apiVersion: batch/v1
      kind: Job
      metadata:
        name: flink-jobmanager
      spec:
        template:
          metadata:
            labels:
              app: flink
              component: jobmanager
          spec:
            restartPolicy: OnFailure
            containers:
              - name: jobmanager
                image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
                env:
                args: [&quot;standalone-job&quot;, &quot;--job-classname&quot;, &quot;org.apache.flink.examples.java.wordcount.WordCount&quot;,&quot;--output&quot;,&quot;/tmp/result&quot;]
                ports:
                  - containerPort: 6123
                    name: rpc
                  - containerPort: 6124
                    name: blob-server
                  - containerPort: 8081
                    name: webui
                livenessProbe:
                  tcpSocket:
                    port: 6123
                  initialDelaySeconds: 30
                  periodSeconds: 60
                volumeMounts:
                  - name: flink-config-volume
                    mountPath: /opt/apache/flink-1.14.6/conf
                  - name: job-artifacts-volume
                    mountPath: /opt/apache/flink-1.14.6/usrlib
                securityContext:
                  runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
            volumes:
              - name: flink-config-volume
                configMap:
                  name: flink-config
                  items:
                    - key: flink-conf.yaml
                      path: flink-conf.yaml
                    - key: log4j-console.properties
                      path: log4j-console.properties
              - name: job-artifacts-volume
                hostPath:
                  path: /mnt/nfsdata/flink/application/job-artifacts
    

    > 【温馨提示】注意这里的挂载/mnt/bigdata/flink/usrlib,最好这里使用共享目录。 > >

    taskmanager-job-deployment.yaml

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-taskmanager
      spec:
        replicas: 2
        selector:
          matchLabels:
            app: flink
            component: taskmanager
        template:
          metadata:
            labels:
              app: flink
              component: taskmanager
          spec:
            containers:
            - name: taskmanager
              image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
              env:
              args: [&quot;taskmanager&quot;]
              ports:
              - containerPort: 6122
                name: rpc
              - containerPort: 6125
                name: query-state
              livenessProbe:
                tcpSocket:
                  port: 6122
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:
              - name: flink-config-volume
                mountPath: /opt/apache/flink-1.14.6/conf
              - name: job-artifacts-volume
                mountPath: /opt/apache/flink-1.14.6/usrlib
              securityContext:
                runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
            volumes:
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: flink-conf.yaml
                  path: flink-conf.yaml
                - key: log4j-console.properties
                  path: log4j-console.properties
            - name: job-artifacts-volume
              hostPath:
                path: /mnt/nfsdata/flink/application/job-artifacts
    

    【4】创建 flink 集群并提交任务

      kubectl create ns flink
      # Configuration and service definition
      kubectl create -f flink-configuration-configmap.yaml -n flink
    
      service
      =======
    
    
    
      kubectl create -f jobmanager-service.yaml -n flink
      kubectl create -f jobmanager-rest-service.yaml -n flink
      kubectl create -f taskmanager-query-state-service.yaml -n flink
    
    
      Create the deployments for the cluster
      ======================================
    
    
      `kubectl create -f  jobmanager-application-non-ha.yaml -n flink
      kubectl create -f  taskmanager-job-deployment.yaml -n flink`
    

    <br />

    查看

      kubectl get pods,svc -n flink
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/eaec20fd1cfe4d04840b4db2096a3dda.png.jpg)
    
    【5】删除 flink 集群
    
    
      kubectl delete -f flink-configuration-configmap.yaml -n flink
      kubectl delete -f jobmanager-service.yaml -n flink
      kubectl delete -f jobmanager-rest-service.yaml -n flink
      kubectl delete -f taskmanager-query-state-service.yaml -n flink
      kubectl delete -f jobmanager-application-non-ha.yaml -n flink
      kubectl delete -f taskmanager-job-deployment.yaml -n flink
      `kubectl delete ns flink --force`
    

    <br />

    【6】查看

      kubectl get pods,svc -n flink
      kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash
    
    
    
    
    ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/838b0a5753f942659ad8cbcc3f3d49cb.png.jpg)
    

    Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享

赞(2)
未经允许不得转载:工具盒子 » 详解 Flink 在 K8S 中的部署与实战操作