一、概述
Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
Flink 官网:
不同版本的文档:
flink on k8s 官方文档:
也可以参考我之前的文章:大数据 Hadoop 之 实时计算流计算引擎 Flink 环境部署:
GitHub 地址:
二、Flink 运行模式
官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/overview/
Flink on yarn 有三种运行模式:
- yarn-session 模式(Seesion Mode)
- yarn-cluster 模式(Per-Job Mode)
- Application 模式(Application Mode)
【温馨提示】Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用。它将被丢弃在 FLINK-26000 中。
三、Flink on k8s 实战操作
1)flink 下载
下载地址:
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
2)构建基础镜像
docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12
3)session 模式
Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行。你可以在一个 Session 集群上运行多个 Flink 作业。每个作业都需要在集群部署完成后提交到集群。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
- 运行
JobManager
的部署 TaskManagers
池的部署- 暴露
JobManager
的 REST 和 UI 端口的服务
1、Native Kubernetes 模式
参数配置:
【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
开始构建镜像
docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache
上传镜像
docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
【2】创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建 flink 集群
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort
【4】提交任务
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar
# 参数配置
./examples/streaming/WordCount.jar -Dkubernetes.taskmanager.cpu=2000m
-Dexternal-resource.limits.kubernetes.cpu=4000m
-Dexternal-resource.limits.kubernetes.memory=10Gi
-Dexternal-resource.requests.kubernetes.cpu=2000m
-Dexternal-resource.requests.kubernetes.memory=8Gi
-Dkubernetes.taskmanager.cpu=2000m
【温馨提示】注意 jdk 版本,目前 jdk8 是正常的。
【5】查看
kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-1
【6】删除 flink 集群
kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force
2、Standalone 模式
【1】构建镜像
默认用户是 flink 用户,这里我换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。
启动脚本 docker-entrypoint.sh
#!/usr/bin/env bash
###############################################################################
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
###############################################################################
COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"
If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
Alpine
echo su-exec admin
else
Others
echo gosu admin
fi
}
copy_plugins_if_required() {
if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
return 0
fi
echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}
set_config_option() {
local option=$1
local value=$2
escape periods for usage in regular expressions
local escaped_option=$(echo ${option} | sed -e "s/./\./g")
either override an existing entry, or append a new one
if grep -E "^${escaped_option}:." "${CONF_FILE}" > /dev/null; then
sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}"
else
echo "${option}: ${value}" >> "${CONF_FILE}"
fi
}
prepare_configuration() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125
if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
fi
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if [ -f "$JEMALLOC_PATH" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [ -f "$JEMALLOC_FALLBACK" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
fi
fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")
if [ "$1" = "help" ]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
exit 0
elif [ "$1" = "jobmanager" ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
args=("${args[@]:1}")
echo "Starting Job Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
args=("${args[@]:1}")
echo "Starting History Server"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
args=("${args[@]:1}")
echo "Starting Task Manager"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
编排 Dockerfile
FROM myharbor.com/bigdata/centos:7.9.2009
USER root
安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN mkdir -p /opt/apache
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/
ENV FLINK_HOME /opt/apache/flink-1.14.6
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH
创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/
#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.sh
RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
RUN chown -R admin:admin /opt/apache
#设置的工作目录
WORKDIR $FLINK_HOME
对外暴露端口
EXPOSE 6123 8081
执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]
【2】创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排 yaml 文件
-
flink-configuration-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO
The following lines keep the log level of common libraries/connectors on
log level INFO. The root logger does not override this. You have to manually
change the log levels here.
logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO
Log all infos to the console
appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10
Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF</code></pre>
jobmanager-service.yaml
可选服务,仅非 HA 模式需要。apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
jobmanager-rest-service.yaml
可选服务,将 jobmanagerrest
端口公开为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager
taskmanager-query-state-service.yaml
可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
以上几个配置文件是公共的
-
jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user flink from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
taskmanager-session-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
【4】创建 flink 集群
kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink
service
kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink
Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink kubectl create -f taskmanager-session-deployment.yaml -n flink
镜像逆向解析 dockerfile
alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler" whaler flink:1.14.6-scala_2.12
查看
kubectl get pods,svc -n flink -owide
Web UI 地址:
http://192.168.182.110:30081/#/overview
【5】提交任务
./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink
![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/b5a680b3c7b54843b111764026e50534.png.jpg)
【6】删除 flink 集群
kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f taskmanager-session-deployment.yaml -n flink kubectl delete -f jobmanager-session-deployment.yaml -n flink kubectl delete ns flink --force
【7】访问 flink web
端口就是
jobmanager-rest-service.yaml
文件中的 NodePorthttp://192.168.182.110:30081/#/overview
![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/1b6d5b88d27142cd80d6f348826cdcf5.png.jpg)
4)application 模式(推荐)
Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:
-
运行
JobManager
的应用程序 -
TaskManagers
池的部署 -
暴露
JobManager
的 REST 和 UI 端口的服务
1、Native Kubernetes 模式(常用)
【1】构建镜像 Dockerfile
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12 RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone RUN export LANG=zh_CN.UTF-8 RUN mkdir -p $FLINK_HOME/usrlib COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/
开始构建镜像
docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache
上传镜像
docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
删除镜像
docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
【2】创建命名空间和 serviceacount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】创建 flink 集群并提交任务
./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \ -Dkubernetes.jobmanager.replicas=1 \ -Dkubernetes.namespace=flink \ -Dkubernetes.jobmanager.service-account=flink-service-account \ -Dexternal-resource.limits.kubernetes.cpu=2000m \ -Dexternal-resource.limits.kubernetes.memory=2Gi \ -Dexternal-resource.requests.kubernetes.cpu=1000m \ -Dexternal-resource.requests.kubernetes.memory=1Gi \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:///opt/flink/usrlib/TopSpeedWindowing.jar
【注意】
local
是应用模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。查看
kubectl get pods pods,svc -n flink
kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink
![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/2e9ce711781f46ab91793e604f650b96.png.jpg) ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/30877ab2fdab4edb9a3e0c4fe2773a6b.png.jpg)
【4】删除 flink 集群
kubectl delete deployment/my-first-application-cluster -n flink kubectl delete ns flink --force
2、Standalone 模式
【1】构建镜像 Dockerfile
启动脚本
docker-entrypoint.sh
#!/usr/bin/env bash
###############################################################################
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
###############################################################################
COMMAND_STANDALONE="standalone-job" COMMAND_HISTORY_SERVER="history-server"
If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() { if [ $(id -u) != 0 ]; then
Don't need to drop privs if EUID != 0
return elif [ -x /sbin/su-exec ]; then
Alpine
echo su-exec admin else
Others
echo gosu admin fi }
copy_plugins_if_required() { if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then return 0 fi
echo "Enabling required built-in plugins" for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do echo "Linking ${target_plugin} to plugin directory" plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then echo "Plugin ${target_plugin} does not exist. Exiting." exit 1 else ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" echo "Successfully enabled ${target_plugin}" fi
done }
set_config_option() { local option=$1 local value=$2
escape periods for usage in regular expressions
local escaped_option=$(echo ${option} | sed -e "s/./\./g")
either override an existing entry, or append a new one
if grep -E "^${escaped_option}:." "${CONF_FILE}" > /dev/null; then sed -i -e "s/${escaped_option}:./$option: $value/g" "${CONF_FILE}" else echo "${option}: ${value}" >> "${CONF_FILE}" fi }
prepare_configuration() { set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS} set_config_option blob.server.port 6124 set_config_option query.server.port 6125
if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS} fi if [ -n "${FLINK_PROPERTIES}" ]; then echo "${FLINK_PROPERTIES}" &gt;&gt; "${CONF_FILE}" fi envsubst &lt; "${CONF_FILE}" &gt; "${CONF_FILE}.tmp" &amp;&amp; mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
maybe_enable_jemalloc() { if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" if [ -f "$JEMALLOC_PATH" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH elif [ -f "$JEMALLOC_FALLBACK" ]; then export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK else if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then MSG_PATH=$JEMALLOC_PATH else MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" fi echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." fi fi }
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@") if [ "$1" = "help" ]; then printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" printf " Or $(basename "$0") help\n\n" printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" exit 0 elif [ "$1" = "jobmanager" ]; then args=("${args[@]:1}")
echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then args=("${args[@]:1}")
echo "Starting Job Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then args=("${args[@]:1}")
echo "Starting History Server" exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then args=("${args[@]:1}")
echo "Starting Task Manager" exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
编排
Dockerfile
FROM myharbor.com/bigdata/centos:7.9.2009
USER root
安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof
设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN mkdir -p /opt/apache
ADD jdk-8u212-linux-x64.tar.gz /opt/apache/
ADD flink-1.14.6-bin-scala_2.12.tgz /opt/apache/
ENV FLINK_HOME /opt/apache/flink-1.14.6 ENV JAVA_HOME /opt/apache/jdk1.8.0_212 ENV PATH $JAVA_HOME/bin:$PATH
创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/
#RUN mkdir home COPY docker-entrypoint.sh /opt/apache/
RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin
RUN chown -R admin:admin /opt/apache RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh
#设置的工作目录 WORKDIR $FLINK_HOME
对外暴露端口
EXPOSE 6123 8081
执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"] CMD ["help"]
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache # 上传镜像 docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 # 删除镜像 docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
【2】创建命名空间和 serviceacount
# 创建namespace kubectl create ns flink # 创建serviceaccount kubectl create serviceaccount flink-service-account -n flink # 用户授权 kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
【3】编排 yaml 文件
flink-configuration-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 3200m taskmanager.memory.process.size: 2728m taskmanager.memory.flink.size: 2280m parallelism.default: 2 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF</code></pre>
jobmanager-service.yaml
可选服务,仅非 HA 模式需要。apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
jobmanager-rest-service.yaml
可选服务,将 jobmanagerrest
端口公开为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager
taskmanager-query-state-service.yaml
可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。apiVersion: v1 kind: Service metadata: name: flink-taskmanager-query-state spec: type: NodePort ports: - name: query-state port: 6125 targetPort: 6125 nodePort: 30025 selector: app: flink component: taskmanager
jobmanager-application-non-ha.yaml
,非高可用apiVersion: batch/v1 kind: Job metadata: name: flink-jobmanager spec: template: metadata: labels: app: flink component: jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts
> 【温馨提示】注意这里的挂载
/mnt/bigdata/flink/usrlib
,最好这里使用共享目录。 > >taskmanager-job-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 env: args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/apache/flink-1.14.6/conf - name: job-artifacts-volume mountPath: /opt/apache/flink-1.14.6/usrlib securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: job-artifacts-volume hostPath: path: /mnt/nfsdata/flink/application/job-artifacts
【4】创建 flink 集群并提交任务
kubectl create ns flink # Configuration and service definition kubectl create -f flink-configuration-configmap.yaml -n flink service ======= kubectl create -f jobmanager-service.yaml -n flink kubectl create -f jobmanager-rest-service.yaml -n flink kubectl create -f taskmanager-query-state-service.yaml -n flink Create the deployments for the cluster ====================================== `kubectl create -f jobmanager-application-non-ha.yaml -n flink kubectl create -f taskmanager-job-deployment.yaml -n flink`
<br />
查看
kubectl get pods,svc -n flink ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/eaec20fd1cfe4d04840b4db2096a3dda.png.jpg)
【5】删除 flink 集群
kubectl delete -f flink-configuration-configmap.yaml -n flink kubectl delete -f jobmanager-service.yaml -n flink kubectl delete -f jobmanager-rest-service.yaml -n flink kubectl delete -f taskmanager-query-state-service.yaml -n flink kubectl delete -f jobmanager-application-non-ha.yaml -n flink kubectl delete -f taskmanager-job-deployment.yaml -n flink `kubectl delete ns flink --force`
<br />
【6】查看
kubectl get pods,svc -n flink kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash ![](http://static.51tbox.com/static/2024-11-22/col/31ad3ac60e7e93fca27e965fba9d2c2b/838b0a5753f942659ad8cbcc3f3d49cb.png.jpg)
Flink on k8s 讲解与实战操作,这里只是拿官方示例进行演示,后续也有相关企业案例,有任何疑问的小伙伴欢迎给我留言,后续会持续分享
-