一、概述
Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
Flink 官网:
不同版本的文档:
flink on k8s 官方文档:
也可以参考我之前的文章:大数据 Hadoop 之 实时计算流计算引擎 Flink 环境部署:
GitHub 地址:
二、Flink 运行模式
官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/overview/
Flink on yarn 有三种运行模式:
- yarn-session 模式(Seesion Mode)
- yarn-cluster 模式(Per-Job Mode)
- Application 模式(Application Mode)
【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在 FLINK-26000 中。
三、Flink on k8s 实战操作
1)flink 下载
下载地址:
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
2)构建基础镜像
docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12
3)session 模式
Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个 Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
- 运行
JobManager
的部署 TaskManagers
池的部署- 暴露
JobManager
的 REST 和 UI 端口的服务
1、Native Kubernetes 模式
参数配置:
【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
开始构建镜像
docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache
上传镜像
====
`docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
`
【2】创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建 flink 集群
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort
【4】提交任务
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar
# 参数配置
`./examples/streaming/WordCount.jar
-Dkubernetes.taskmanager.cpu=2000m `
`
-Dexternal-resource.limits.kubernetes.cpu=4000m `
`
-Dexternal-resource.limits.kubernetes.memory=10Gi `
`
-Dexternal-resource.requests.kubernetes.cpu=2000m `
`
-Dexternal-resource.requests.kubernetes.memory=8Gi `
`
-Dkubernetes.taskmanager.cpu=2000m `
`
`
【温馨提示】注意 jdk 版本,目前 jdk8 是正常的。
【5】查看
kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-1
【6】删除 flink 集群
kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force
2、Standalone 模式
【1】构建镜像
默认用户是 flink 用户,这里我换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。
启动脚本 docker-entrypoint.sh
#!/usr/bin/env bash
###############################################################################
Licensed to the Apache Software Foundation (ASF) under one
==========================================================
or more contributor license agreements. See the NOTICE file
===========================================================
distributed with this work for additional information
=====================================================
regarding copyright ownership. The ASF licenses this file
=========================================================
to you under the Apache License, Version 2.0 (the
=================================================
"License"); you may not use this file except in compliance
==========================================================
with the License. You may obtain a copy of the License at
=========================================================
http://www.apache.org/licenses/LICENSE-2.0
==========================================
Unless required by applicable law or agreed to in writing, software
===================================================================
distributed under the License is distributed on an "AS IS" BASIS,
=================================================================
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
========================================================================
See the License for the specific language governing permissions and
===================================================================
limitations under the License.
==============================
###############################################################################
COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"
If unspecified, the hostname of the container is taken as the JobManager address
================================================================================
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {
if \[ $(id -u) != 0 \]; then
# Don't need to drop privs if EUID != 0
return
elif \[ -x /sbin/su-exec \]; then
# Alpine
echo su-exec admin
else
# Others
echo gosu admin
fi
}
copy_plugins_if_required() {
if \[ -z "$ENABLE_BUILT_IN_PLUGINS" \]; then
return 0
fi
echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" \| tr ';' ' '); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}
set_config_option() {
local option=$1
local value=$2
escape periods for usage in regular expressions
===============================================
local escaped_option=$(echo ${option} \| sed -e "s/./\\./g")
either override an existing entry, or append a new one
======================================================
if grep -E "\^${escaped_option}:." "${CONF_FILE}" \> /dev/null; then
sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}"
else
echo "${option}: ${value}" \>\> "${CONF_FILE}"
fi
}
prepare_configuration() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125
if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
maybe_enable_jemalloc() {
if \[ "${DISABLE_JEMALLOC:-false}" == "false" \]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if \[ -f "$JEMALLOC_PATH" \]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif \[ -f "$JEMALLOC_FALLBACK" \]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if \[ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" \]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
fi
fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")
if \[ "$1" = "help" \]; then
printf "Usage: $(basename "$0") (jobmanager\|${COMMAND_STANDALONE}\|taskmanager\|${COMMAND_HISTORY_SERVER})\\n"
printf " Or $(basename "$0") help\\n\\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\\n"
exit 0
elif \[ "$1" = "jobmanager" \]; then
args=("${args\[@\]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif \[ "$1" = ${COMMAND_STANDALONE} \]; then
args=("${args\[@\]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif \[ "$1" = ${COMMAND_HISTORY_SERVER} \]; then
args=("${args\[@\]:1}")
echo "Starting History Server"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif \[ "$1" = "taskmanager" \]; then
args=("${args\[@\]:1}")
echo "Starting Task Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args\[@\]}")
Running command in pass-through mode
====================================
`exec $(drop_privs_cmd) "${args[@]}"
`
编排 Dockerfile
FROM myharbor.com/bigdata/centos:7.9.2009
USER root
安装常用工具
======
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
设置时区,默认是UTC时区
=============
RUN rm -f /etc/localtime \&\& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \&\& echo "Asia/Shanghai" \> /etc/timezone
RUN mkdir -p /opt/apache
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/
ENV FLINK_HOME /opt/apache/flink-1.14.6
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH
创建用户应用jar目录
===========
RUN mkdir $FLINK_HOME/usrlib/
#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.sh
RUN groupadd --system --gid=9999 admin \&\& useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
RUN chown -R admin:admin /opt/apache
#设置的工作目录
WORKDIR $FLINK_HOME
对外暴露端口
======
EXPOSE 6123 8081
执行脚本,构建镜像时不执行,运行实例才会执行
======================
`ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]`
【2】创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排 yaml 文件
-
flink-configuration-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF</code></pre>
jobmanager-service.yaml
可选服务,仅非 HA 模式需要。apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
jobmanager-rest-service.yaml
可选服务,将 jobmanagerrest
端口公开为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager
taskmanager-query-state-service.yaml
可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
以上几个配置文件是公共的
-
jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user flink from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
【4】创建 flink 集群
kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink service ======= kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink Create the deployments for the cluster ====================================== `kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink kubectl create -f taskmanager-session-deployment.yaml -n flink`
镜像逆向解析 dockerfile
alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler" whaler flink:1.14.6-scala_2.12
查看
kubectl get pods,svc -n flink -owide ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/31d9c7303e024c3d8e93db09700b3e47.png.jpg)
Web UI 地址:
http://192.168.182.110:30081/#/overview
【5】提交任务
./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/44f72918e5c440a2a1fc94dff56b0a5e.png.jpg)
![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/b5a680b3c7b54843b111764026e50534.png.jpg)
【6】删除 flink 集群
kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f taskmanager-session-deployment.yaml -n flink kubectl delete -f jobmanager-session-deployment.yaml -n flink kubectl delete ns flink --force
【7】访问 flink web
端口就是
jobmanager-rest-service.yaml
文件中的 NodePorthttp://192.168.182.110:30081/#/overview
![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/1b6d5b88d27142cd80d6f348826cdcf5.png.jpg)
4)application 模式(推荐)
Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:
-
运行
JobManager
的应用程序 -
TaskManagers
池的部署 -
暴露
JobManager
的 REST 和 UI 端口的服务
1、Native Kubernetes 模式(常用)
【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 RUN mkdir -p $FLINK_HOME/usrlib COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/
开始构建镜像
docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache 上传镜像 ==== docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 删除镜像 ==== `docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12`
【2】创建命名空间和 serviceacount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建 flink 集群并提交任务
./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \ -Dkubernetes.jobmanager.replicas=1 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dexternal-resource.limits.kubernetes.cpu=2000m \ -Dexternal-resource.limits.kubernetes.memory=2Gi \ -Dexternal-resource.requests.kubernetes.cpu=1000m \ -Dexternal-resource.requests.kubernetes.memory=1Gi \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:///opt/flink/usrlib/TopSpeedWindowing.jar
【注意】
local
是应用模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。查看
kubectl get pods pods,svc -n flink ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/2e681c2c9aa148d39b580c4393a19dd3.png.jpg) kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/87e812206ac94b029c0ff41cae820b6b.png.jpg)
![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/2e9ce711781f46ab91793e604f650b96.png.jpg) ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/30877ab2fdab4edb9a3e0c4fe2773a6b.png.jpg)
【4】删除 flink 集群
kubectl delete deployment/my-first-application-cluster -n flink kubectl delete ns flink --force
2、Standalone 模式
【1】构建镜像 Dockerfile
启动脚本
docker-entrypoint.sh
#!/usr/bin/env bash ############################################################################### Licensed to the Apache Software Foundation (ASF) under one ========================================================== or more contributor license agreements. See the NOTICE file =========================================================== distributed with this work for additional information ===================================================== regarding copyright ownership. The ASF licenses this file ========================================================= to you under the Apache License, Version 2.0 (the ================================================= "License"); you may not use this file except in compliance ========================================================== with the License. You may obtain a copy of the License at ========================================================= http://www.apache.org/licenses/LICENSE-2.0 ========================================== Unless required by applicable law or agreed to in writing, software =================================================================== distributed under the License is distributed on an "AS IS" BASIS, ================================================================= WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ======================================================================== See the License for the specific language governing permissions and =================================================================== limitations under the License. ============================== ############################################################################### COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server" If unspecified, the hostname of the container is taken as the JobManager address ================================================================================ JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml" drop_privs_cmd() { if \[ $(id -u) != 0 \]; then # Don't need to drop privs if EUID != 0 return elif \[ -x /sbin/su-exec \]; then # Alpine echo su-exec admin else # Others echo gosu admin fi } copy_plugins_if_required() { if \[ -z "$ENABLE_BUILT_IN_PLUGINS" \]; then return 0 fi echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" \| tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar} mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then echo "Plugin ${target_plugin} does not exist. Exiting." exit 1 else ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" echo "Successfully enabled ${target_plugin}" fi done } set_config_option() { local option=$1 local value=$2 escape periods for usage in regular expressions =============================================== local escaped_option=$(echo ${option} \| sed -e "s/./\\./g") either override an existing entry, or append a new one ====================================================== if grep -E "\^${escaped_option}:." "${CONF_FILE}" \> /dev/null; then sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" \>\> "${CONF_FILE}" fi } prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125 if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} fi if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}" fi envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}" } maybe_enable_jemalloc() { if \[ "${DISABLE_JEMALLOC:-false}" == "false" \]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if \[ -f "$JEMALLOC_PATH" \]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif \[ -f "$JEMALLOC_FALLBACK" \]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if \[ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" \]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi } maybe_enable_jemalloc copy_plugins_if_required prepare_configuration args=("$@") if \[ "$1" = "help" \]; then printf "Usage: $(basename "$0") (jobmanager\|${COMMAND_STANDALONE}\|taskmanager\|${COMMAND_HISTORY_SERVER})\\n" printf " Or $(basename "$0") help\\n\\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\\n" exit 0 elif \[ "$1" = "jobmanager" \]; then args=("${args\[@\]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" elif \[ "$1" = ${COMMAND_STANDALONE} \]; then args=("${args\[@\]:1}") echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" elif \[ "$1" = ${COMMAND_HISTORY_SERVER} \]; then args=("${args\[@\]:1}") echo "Starting History Server" exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" elif \[ "$1" = "taskmanager" \]; then args=("${args\[@\]:1}") echo "Starting Task Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" fi args=("${args\[@\]}") Running command in pass-through mode ==================================== `exec $(drop_privs_cmd) "${args[@]}"`
编排
Dockerfile
FROM myharbor.com/bigdata/centos:7.9.2009 USER root 安装常用工具 ====== RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof 设置时区,默认是UTC时区 ============= RUN rm -f /etc/localtime \&\& ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \&\& echo "Asia/Shanghai" \> /etc/timezone RUN mkdir -p /opt/apache ADD jdk-8u212-linux-x64.tar.gz /opt/apache/ ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/ ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH 创建用户应用jar目录 =========== RUN mkdir $FLINK_HOME/usrlib/ #RUN mkdir home COPY docker-entrypoint.sh /opt/apache/ RUN groupadd --system --gid=9999 admin \&\& useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin RUN chown -R admin:admin /opt/apache RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh #设置的工作目录 WORKDIR $FLINK_HOME 对外暴露端口 ====== EXPOSE 6123 8081 执行脚本,构建镜像时不执行,运行实例才会执行 ====================== `ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]`
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache # 上传镜像 docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 # 删除镜像 docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
【2】创建命名空间和 serviceacount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排 yaml 文件
flink-configuration-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF</code></pre> `jobmanager-service.yaml`可选服务,仅非 HA 模式需要。 apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager `jobmanager-rest-service.yaml` 可选服务,将 jobmanager `rest`端口公开为公共 Kubernetes 节点的端口。 apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager `taskmanager-query-state-service.yaml` 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。 apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager `jobmanager-application-non-ha.yaml` ,非高可用 apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts > 【温馨提示】注意这里的挂载`/mnt/bigdata/flink/usrlib`,最好这里使用共享目录。 > > `taskmanager-job-deployment.yaml` apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts 【4】创建 flink 集群并提交任务 kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink service ======= kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink Create the deployments for the cluster ====================================== `kubectl create -f jobmanager-application-non-ha.yaml -n flink kubectl create -f taskmanager-job-deployment.yaml -n flink` <br /> 查看 kubectl get pods,svc -n flink ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/eaec20fd1cfe4d04840b4db2096a3dda.png.jpg) ``` 【5】删除 flink 集群 ``` kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f jobmanager-rest-service.yaml -n flink kubectl delete -f taskmanager-query-state-service.yaml -n flink kubectl delete -f jobmanager-application-non-ha.yaml -n flink kubectl delete -f taskmanager-job-deployment.yaml -n flink `kubectl delete ns flink --force` <br /> 【6】查看 kubectl get pods,svc -n flink kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/838b0a5753f942659ad8cbcc3f3d49cb.png.jpg) Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享
-