Flink 1.10 Native Kubernetes 原理与实践

Stella981
• 阅读 509

https://yq.aliyun.com/articles/745483

作者:周凯波(宝牛)

千呼万唤始出来,在 Kubernetes 如火如荼的今天,Flink 社区终于在 1.10 版本提供了对 Kubernetes 的原生支持,也就是 Native Kubernetes Integration。不过还只是Beta版本,预计会在 1.11 版本里面提供完整的支持。

我们知道,在 Flink 1.9 以及之前的版本里面,如果要在 Kubernetes 上运行 Flink 任务是需要事先指定好需要的 TaskManager(TM) 的个数以及CPU和内存的。这样的问题是:大多数情况下,你在任务启动前根本无法精确的预估这个任务需要多少个TM。如果指定的TM多了,会导致资源浪费;如果指定的TM个数少了,会导致任务调度不起来。本质原因是在 Kubernetes 上运行的 Flink 任务并没有直接向 Kubernetes 集群去申请资源。

Flink 在 1.10 版本完成了Active Kubernetes Integration的第一阶段,支持了 session clusters。后续的第二阶段会提供更完整的支持,如支持 per-job 任务提交,以及基于原生 Kubernetes API 的高可用,支持更多的 Kubernetes 参数如 toleration, label 和 node selector 等。Active Kubernetes Integration中的Active意味着 Flink 的 ResourceManager (KubernetesResourceManager) 可以直接和 Kubernetes 通信,按需申请新的 Pod,类似于 Flink 中对 Yarn 和 Mesos 的集成所做的那样。在多租户环境中,用户可以利用 Kubernetes 里面的 namespace 做资源隔离启动不同的 Flink 集群。当然,Kubernetes 集群中的用户帐号和赋权是需要提前准备好的。

原理

Flink 1.10 Native Kubernetes 原理与实践

工作原理如下(段首的序号对应图中箭头所示的数字):

  1. Flink 客户端首先连接 Kubernetes API Server,提交 Flink 集群的资源描述文件,包括 configmap,job manager service,job manager deployment 和 Owner Reference
  2. Kubernetes Master 就会根据这些资源描述文件去创建对应的 Kubernetes 实体。以我们最关心的 job manager deployment 为例,Kubernetes 集群中的某个节点收到请求后,Kubelet 进程会从中央仓库下载 Flink 镜像,准备和挂载 volume,然后执行启动命令。在 flink master 的 pod 启动后,Dispacher 和 KubernetesResourceManager 也都启动了。

前面两步完成后,整个 Flink session cluster 就启动好了,可以接受提交任务请求了。

  1. 用户可以通过 Flink 命令行即 flink client 往这个 session cluster 提交任务。此时 job graph 会在 flink client 端生成,然后和用户 jar 包一起通过 RestClinet 上传。
  2. 一旦 job 提交成功,JobSubmitHandler 收到请求就会提交 job 给 Dispatcher。接着就会生成一个 job master。
  3. JobMaster 向 KubernetesResourceManager 请求 slots。
  4. KubernetesResourceManager 从 Kubernetes 集群分配 TaskManager。每个TaskManager都是具有唯一表示的 Pod。KubernetesResourceManager 会为 TaskManager 生成一份新的配置文件,里面有 Flink Master 的 service name 作为地址。这样在 Flink Master failover之后,TaskManager 仍然可以重新连上。
  5. Kubernetes 集群分配一个新的 Pod 后,在上面启动 TaskManager。
  6. TaskManager 启动后注册到 SlotManager。
  7. SlotManager 向 TaskManager 请求 slots。
  8. TaskManager 提供 slots 给 JobMaster。然后任务就会被分配到这个 slots 上运行。

实践

Flink 的文档上对如何使用已经写的比较详细了,不过刚开始总会踩到一些坑。如果对 Kubernetes 不熟,可能会花点时间。

(1) 首先得有个 Kubernetes 集群,会有个 ~/.kube/config 文件。尝试执行 kubectl get nodes 看下集群是否正常。

如果没有这个 ~/.kube/config 文件,会报错:

2020-02-17 22:27:17,253 WARN  io.fabric8.kubernetes.client.Config                           - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli          - Error while running the Flink session.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Service]  with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c]  in namespace: [default]  failed.
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
    at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
    at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known

(2) 提前创建好用户和赋权(RBAC)

kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink

如果没有创建用户,使用默认的用户去提交,会报错:

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes. 

Message: Forbidden!Configured service account doesn't have access. 
Service account may have been revoked. pods is forbidden: 
User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default".

(3) 这一步是可选的。默认情况下, JobManager 和 TaskManager 只会将 log 写到各自 pod 的 /opt/flink/log 。如果想通过 kubectl logs 看到日志,需要将 log 输出到控制台。要做如下修改 FLINK_HOME/conf 目录下的 log4j.properties 文件。

log4j.rootLogger=INFO, file, console

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

然后启动 session cluster 的命令行需要带上参数:

-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

(4) 终于可以开始启动 session cluster了。如下命令是启动一个每个 TaskManager 是4G内存,2个CPU,4个slot 的 session cluster。

bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4

更多的参数详见文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes

使用 kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f 就能看到日志了。

如果出现大量的这种日志(目前遇到是云厂商的LoadBalance liveness探测导致):

2020-02-17 14:58:56,323 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Unhandled exception
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
    at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)

可以暂时在 log4j.properties 里面配置上:

log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file

这个日志太多会导致 WebUI 上打开 jobmanger log 是空白,因为文件太大了前端无法显示。

如果前面第(1)和第(2)步没有做,会出现各种异常,通过 kubectl logs 就能很方便的看到日志了。

Session cluster 启动后可以通过 kubectl get pods,svc 来看是否正常。

通过端口转发来查看 Web UI:

kubectl port-forward service/kaibo-test 8081

打开 http://127.0.0.1:8001 就能看到 Flink 的 WebUI 了。

(5) 提交任务

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=kaibo-test examples/streaming/TopSpeedWindowing.jar

我们从 Flink WebUI 页面上可以看到,刚开始启动时,UI上显示 Total/Available Task Slots 为0, Task Managers 也是0。随着任务的提交,资源会动态增加。任务停止后,资源就会释放掉。

在提交任务后,通过 kubectl get pods 能够看到 Flink 为 TaskManager 分配了新的 Pod。

Flink 1.10 Native Kubernetes 原理与实践

(6) 停止 session cluster

echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true

也可以手工删除资源:

kubectl delete service/<ClusterID>

总结

可以看到,Flink 1.10 版本对和 Kubernetes 的集成做了很好的尝试。期待社区后续的 1.11 版本能对 per-job 提供支持,以及和 Kubernetes 的深度集成,例如基于原生 Kubernetes API 的高可用。最新进展请关注 FLINK-14460

点赞
收藏
评论区
推荐文章
秃头王路飞 秃头王路飞
6个月前
webpack5手撸vue2脚手架
webpack5手撸vue相信工作个12年的小伙伴们在面试的时候多多少少怕被问到关于webpack方面的知识,本菜鸟最近闲来无事,就尝试了手撸了下vue2的脚手架,第一次发帖实在是没有经验,望海涵。languageJavaScript"name":"vuecliversion2","version":"1.0.0","desc
浅梦一笑 浅梦一笑
6个月前
初学 Python 需要安装哪些软件?超级实用,小白必看!
编程这个东西是真的奇妙。对于懂得的人来说,会觉得这个工具是多么的好用、有趣,而对于小白来说,就如同大山一样。其实这个都可以理解,大家都是这样过来的。那么接下来就说一下python相关的东西吧,并说一下我对编程的理解。本人也是小白一名,如有不对的地方,还请各位大神指出01名词解释:如果在编程方面接触的比较少,那么对于软件这一块,有几个名词一定要了解,比如开发环
blmius blmius
1年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
光头强的博客 光头强的博客
6个月前
Java面向对象试题
1、请创建一个Animal动物类,要求有方法eat()方法,方法输出一条语句“吃东西”。创建一个接口A,接口里有一个抽象方法fly()。创建一个Bird类继承Animal类并实现接口A里的方法输出一条有语句“鸟儿飞翔”,重写eat()方法输出一条语句“鸟儿吃虫”。在Test类中向上转型创建b对象,调用eat方法。然后向下转型调用eat()方
刚刚好 刚刚好
6个月前
css问题
1、在IOS中图片不显示(给图片加了圆角或者img没有父级)<div<imgsrc""/</divdiv{width:20px;height:20px;borderradius:20px;overflow:h
小森森 小森森
3天前
租房类微信小程序-基于微信云开发-小程序端集成了管理员后台-一键部署,快速发布
温馨提醒本项目使用MITLicense协议,仅适用于学习交流,并且不提供无偿的、不提供无偿的、不提供无偿的维护修改服务(但可提issue)。若直接将本项目用于商用,因本项目带来的所有后果由使用者自行承担。如需商用升级版,请联系我微信,微信二维码在本博客页面右上角在此奉劝某些人,请尊重作者的劳动成果,做人积点德吧!最近发现有人拿我的源码进行二次分
小森森 小森森
6个月前
校园表白墙微信小程序V1.0 SayLove -基于微信云开发-一键快速搭建,开箱即用
后续会继续更新,敬请期待2.0全新版本欢迎添加左边的微信一起探讨!项目地址:(https://www.aliyun.com/activity/daily/bestoffer?userCodesskuuw5n)\2.Bug修复更新日历2.情侣脸功能大家不要使用了,现在阿里云的接口已经要收费了(土豪请随意),\\和注意
晴空闲云 晴空闲云
6个月前
css中box-sizing解放盒子实际宽高计算
我们知道传统的盒子模型,如果增加内边距padding和边框border,那么会撑大整个盒子,造成盒子的宽度不好计算,在实务中特别不方便。boxsizing可以设置盒模型的方式,可以很好的设置固定宽高的盒模型。盒子宽高计算假如我们设置如下盒子:宽度和高度均为200px,那么这会这个盒子实际的宽高就都是200px。但是当我们设置这个盒子的边框和内间距的时候,那
艾木酱 艾木酱
5个月前
快速入门|使用MemFire Cloud构建React Native应用程序
MemFireCloud是一款提供云数据库,用户可以创建云数据库,并对数据库进行管理,还可以对数据库进行备份操作。它还提供后端即服务,用户可以在1分钟内新建一个应用,使用自动生成的API和SDK,访问云数据库、对象存储、用户认证与授权等功能,可专
helloworld_28799839 helloworld_28799839
6个月前
常用知识整理
Javascript判断对象是否为空jsObject.keys(myObject).length0经常使用的三元运算我们经常遇到处理表格列状态字段如status的时候可以用到vue