kubernetes 中的垃圾回收机制

一、概述

一个运行中的 kubernetes 集群,存储了非常多的相互关联的资源,比如我们常用的 deployment,replicaset 和 pod,就是一组有关联的资源。我们在创建 deployment 时,相关的控制器就会自动创建出 replicaset,之后 replicaset 的控制器又会创建出 pod 来运行我们部署的服务。那么同样的,我们肯定也希望在删除 deployment 之后,会自动删除 replicaset 和 pod。这个机制就叫做垃圾回收(下面简称 GC)。

在早期的版本中,GC 是由客户端实现的,比如使用 kubectl delete deployment nginx 这样的命令,kubectl 会删除 pod 和 replicaset。但是这种方式增加了客户端的实现复杂度,不利于统一管理。因此提出了在服务端实现 GC 的需求。实现 GC 有三个主要目标,我们在之后分析的时候,也主要是围绕这三个主要目标进行。

  • 在服务端支持级联删除
  • 中心化级联删除的逻辑,而不是分布在各个组件内
  • 可以选择不删除被依赖的资源。如只删除 deployment,但是保留 replicaset 和 pod

kubernetes 的 GC 是在 controller manager 中,作为一个单独的 controller 来实现的。它通过 discovery client 来动态发现并监听集群中所有支持 delete,listwatch的资源。然后构造资源之间的关系图来记录资源之间的依赖关系。

二、预备知识

为了更好的阐述 kubernetes 的 GC 机制,这里先将一些 k8s 基本知识做一些阐述。

  • finalizer: finalizer 可以翻译为终结期。是一种用来保证资源在被删除之前,能够有机会做一些清理工作的机制。
  • kubernetes 的删除传播策略有三种:
    1. Orphan. 这种策略下,会保留被依赖的资源。如只删除 deployment,但是保留 replicaset 和 pod。
    2. Background. 从 etcd 中删除资源,被依赖的资源由 GC 机制来删除。
    3. Foreground. apiserver 不会删除该资源。而是在它的 finalizer 中添加 foregroundDeletion,并且设置当前的删除时间戳。然后 GC 会先从 etcd 中删除有 ownerReference.blockOwnerDeletion=true 的被依赖资源。最后再删除当前资源。
  • UID。k8s 中的每个资源都有一个唯一的 UID。这个 UID 在整个集群的生命周期中,对于每一个资源来说都是唯一的。所有在标记资源的依赖关系时,需要使用 UID。
  • ownerReferences。每个资源的 metadata 中都会有这个字段,它是一个数组,用来表示该资源的 owner 有哪些。每当 owner 资源被删除,就会从这个数组中移除。当所有的 owner 都被删除后,GC 就会回收该资源。
  • Dependents。如果一组资源 G 的 ownerReference 指向某个具体的资源 A。那个 A 的 dependents 就是 G

三、垃圾回收的实现机制

kubernetes 的 GC 主要由两部分组成:

  • GraphBuilder 主要用来使用 monitors 监听 apiserver 上的所有资源,通过将所有资源的事件插入到 graphChanges 队列中,然后调用 processGraphChanges 方法,从队列中依次取出元素,构建资源之间的依赖关系。并根据情况插入到 attemptToDelete 或 attemptToOrphan 队列中。
  • GarbageCollector 负责从 attemptToDelete 和 attemptToOrphan 队列中取出资源,然后通过一系列负责的过程,判断是否能删除,并进行相关的处理。

因此,对于垃圾回收实现机制的分析,主要从这两部分进行。

3.1 graph builder 的实现

graph builder 可以看做是集群资源状态的维护者。其本身并不会通过 apiserver 修改任何的资源。其定义如下:

// GraphBuilder 处理 informers 提供的事件,更新 uidToNode,使用 LRU 缓存依赖资源,并将
// 资源送入到 attemptToDelete 和 attemptToOrphan 队列
type GraphBuilder struct {
    restMapper meta.RESTMapper

  // 每个 monitor 都会 list/watches 一个资源,结果会被导入到 dependencyGraphBuilder 中·
    monitors    monitors
    monitorLock sync.RWMutex

    informersStarted <-chan struct{}
    stopCh <-chan struct{}
    running bool

    metadataClient metadata.Interface
  // monitors 是该队列的生产者,graphBuilder 根据这些改变来修改内存中的 graph
    graphChanges workqueue.RateLimitingInterface
  // 资源 uid 对应到 graph 中的 node
    uidToNode *concurrentUIDToNode
  // GraphBuilder 是 attemptToDelete 和 attemptToOrphan 的生产者,GC 是消费者。
    attemptToDelete workqueue.RateLimitingInterface
    attemptToOrphan workqueue.RateLimitingInterface
  // GraphBuilder 和 GC 共享 absentOwnerCache. 目前已知的不存在的对象会被添加到缓存中
    absentOwnerCache *UIDCache
    sharedInformers  controller.InformerFactory
    ignoredResources map[schema.GroupResource]struct{}
}

组成 graph 的 node 定义如下:

// 单线程的 GraphBuilder.processGraphChanges() 是 nodes 的唯一 writer。多线程的 GarbageCollector.attemptToDeleteItem() 读取 nodes。
type node struct {
    identity objectReference
    dependentsLock sync.RWMutex
    // dependents 是当前 node 的依赖资源。比如当前 node 是 replicaset,那么这里面保存的应该就是多个 pod
    dependents map[*node]struct{}
    // this is set by processGraphChanges() if the object has non-nil DeletionTimestamp
    // and has the FinalizerDeleteDependents.
    deletingDependents     bool
    deletingDependentsLock sync.RWMutex
    // this records if the object's deletionTimestamp is non-nil.
    beingDeleted     bool
    beingDeletedLock sync.RWMutex
    // this records if the object was constructed virtually and never observed via informer event
    virtual     bool
    virtualLock sync.RWMutex
    // when processing an Update event, we need to compare the updated
    // ownerReferences with the owners recorded in the graph.
    owners []metav1.OwnerReference
}

GraphBuilder 会和 apiserver 同步 monitors,然后为每种资源创建一个 monitor,通过 informer 同步资源的状态。所有的资源都会直接进入 graphChanges 队列。然后在 processGraphChanges 方法中统一处理。

对于 Add 和 Update 事件:

  • 如果当前资源不存在 graph 中,就会实例化出一个 Node 对象,加入到 graph 中。然后将该 node 加入到其 owners 的 dependents 数组中。 这里有一个细节,就是有可能出现一种情况,当前 node 所代表的资源通过 informer 被同步到本地缓存中,但是其 owner 还没有被同步过来。这样更新 owners 的 dependents 就会有遗漏。因此每个 node 都有一个 virtual 字段,在 owner 还没有被同步时,实例化一个虚拟的 owner node 加入到 graph 中。并且将这个虚拟 node 添加到 attemptToDelete 队列中,由之后的 GC 处理。如果这个虚拟 node 在之后被 processGraphChanges 发现了,就会调用 markObserved() 将 virtual 置为 false。
  • 如果已经存在了,那么就要比对新旧资源的 ownerReferences 的变化情况。这里会计算出 added, removed 和 changed。ownerReferences 的变化可能会带来以下要处理的情况。
    • 之前提到 Foreground 的删除,ownerReference 带有 blockOwnerDeletion=true 的资源会 block 的 owner 的删除。那么这里因为 ownerReferences 的变化,需要做以下两点:
    • 对于 removed 的 ownerReference,如果 blockOwnerDeletion 为 true。就说明当前不允许再 block 该 node owner 的删除。因此将 owner 放到 attemptToDelete 队列中,等待 GC 的处理。
    • 对于更新的 ownerReference,如果之前 blockOwnerDeletion 为 true,现在为 false,那么也要加入到 attemptToDelete 队列。
    • 对于 added 和 removed,都需要更新对应的 owner node 的 dependents。
  • 无论是 Add 还是 Update 事件,都会调用 processTransitions 方法,
    • 如果 old object 没有被删除或者没有 orphan finalizer,但是 new object 被删除了或者有 orphan finalizer,就会将该节点插入到 attemptToOrphan 队列。
    • 如果 old object 没有被删除或者没有 foregroundDeletion finalizer,但是 new object 被删除了或者有 foregroundDeletion finalizer,就会将该节点的 dependents 都插入到 attemptToDelete 队列,再将节点插入到 attemptToDelete 队列。

对于删除事件

  • 会从当前的 graph 中移除该 node。起始就是从 uidToNode 中删除该 node,然后更新所有的 owner 的 dependents。
  • 如果当前 node 的 dependents 大于 0,就将当前 node 添加到 absentOwnerCache 中。
  • 将该 node 的 dependents 将入到 attemptToDelete 队列中(垃圾回收)。
  • 最后,从该 node 中找到处于 deletingDependents=true 状态的 owner,也插入到 attemptToDelete 队列中。这里是为了让 GC 检查该 owner 是不是所有的 dependents 都被删除了,如果是,就将该 owner 也删除(这里 owner 处于 deletingDependents,说明使用了 foregroundDeletion,因此需要先删除 dependents,再删除 owner)。

因此可以知道,以下状态的资源会被插入到 attemptToDelete 队列中:

  • finalizers 中有 foregroundDelete
  • owner 的 finalizers 中有 foregroundDelete
  • owner 资源被删除
  • Dependents 中有资源被删除,并且当前状态还不是正在删除 deletingDependents
  • owner 处于 deletingDependents

以下状态的资源会被插入到 attemptToOrphan 队列中:

  • finalizers 中有 orphan

3.2 GarbageCollector 的实现

在 3.1 中提到,GC 会消费 GraphBuilder 的 attemptToDelete 和 attemptToOrphan 队列,来执行 delete 或 orphan 操作。因此我们这里主要关心,什么样的资源可以被 GC delete 或者 orphan。

3.2.1 attemptToDeleteItem

  • 对于 DeletionTimestamp 不为空,并且不处于删除 dependents 的资源。直接跳过处理流程。
  • 如果资源处于 deletingDependents 状态,则统计 blockOwnerDeletion=true的 dependents 个数。
    • 如果为 0,说明当前资源可以删除了,则移除 foregroundDeletion 这个 finalizer 即可。
    • 否则将 dependents 插入到 attemptToDelete 队列中
    • 之后会退出这个循环
  • 对资源的 ownerReferences 进行分类
    • Dangling: owner 对应的资源实际已经不存在了。
    • waitingForDependentsDeletion: owner 的 DeletionTimeStamp 不为空,但是有 foregroundDeletion,所以正在等待 dependents 删除
    • solid: owner 存在,并且不是 waitingForDependentsDeletion
  • 如果 solid 不为空,那么当前资源就不能被 GC,因此只需要通过 patch 来移除 dangling 和 waitingForDependentsDeletion 的 ownerReferences
  • 如果 waitingForDependentsDeletion 不为空并且当前资源的 dependents 不为空。这个判断用来处理循环依赖的异常情况,因为当前资源并不处于删除状态且有 dependents,其 owner 又在等待该 item 的删除,说明这里有一个循环依赖。解决办法就是通过 patch 去更改该资源的 blockOwnerDeletion 为 false。
  • 如果上面两种情况都不是。就会根据当前资源的 finalizer 来删除资源
    • orphan
    • foreground
    • Background

因此可以得出,以下状态的资源会被 GC 调用删除请求:

  • 资源处于 deletingDependents 状态,且其没有 dependents 的 blockOwnerDeletion 为 true。先移除 foregroundDeletion finalizer,然后删除
  • 资源的 owner 和 dependents 都有 blockOwnerDeletion。如果 dependents 处于 deletingDependents 状态。为了防止存在循环依赖,会先把 owner 的 unblock。然后使用 foreground 来删除当前资源。
  • 资源没有 solid 的 owner,那么这个资源就是应该被级联删除的资源。所以根据该资源的 finalizer 来删除。默认使用 background 的方式删除。

3.2.2 attemptToOrphan

orphan 是防止某些情况下资源被 GC 回收的方式。attemptToOrphan 的逻辑要简短一些,如下:

  • 移除 dependents 对当前资源 ownerReferences
  • 移除该资源的 orphan finalizer (这个更新事件会被 GraphBuilder 获取到,然后该资源符合进入 attemptToDelete 队列的条件。之后再由 GC 的处理,最终会被删除。)

总结

根据以上流程,附上自己整理的一个整体的 GC 流程图

k8s-gc

参考

garbage collection

从 iptables 看 k8s service 的实现机制

概述

k8s service 可以看做是多个 Pod 的负载均衡。有以下几种 service:

  • LoadBalancer
  • ClusterIP
  • NodePort
  • ExternalName

在 service 的演进中,从最初的 userspace 的方案变成 iptables 和 ipvs 的方案,其中,ipvs 主要是解决了 iptables 的性能问题。这篇文章主要分析 iptables 如何实现 service 的负载均衡。

ClusterIP

ClusterIP 是提供在集群中访问 Service 的方案,通常每个 Service 都会分配一个 VIP,然后为多个 Pod 提供负载均衡。这里我们创建两个副本的 nginx 部署,以及一个 nginx service。具体信息如下:

$ kubectl get endpoints nginx
NAME    ENDPOINTS                     AGE
nginx   172.17.0.4:80,172.17.0.5:80   65m

$ kubectl get service nginx
NAME    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)   AGE
nginx   ClusterIP   10.111.67.225   <none>        80/TCP    65m

在集群中访问 nginx.default.svc.cluster.local 时,DNS 会将这个地址解析到 Service 的 IP 上,也就是 10.111.67.225。下面我们看看 iptables 是如何将访问这个地址的流量转到真实的 Pod 上的。

首先看一下 nat 表上的 OUTPUT 链:

$ iptables -t nat -nL OUTPUT
Chain OUTPUT (policy ACCEPT)
target     prot opt source               destination
KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */
DOCKER     all  --  0.0.0.0/0           !127.0.0.0/8          ADDRTYPE match dst-type LOCAL

第一条规则会匹配所有的流量,然后跳到 KUBE-SERVICES 这条链上。我们看一下 KUBE-SERVICES 的具体内容:

$ iptables -t nat -nL KUBE-SERVICES
Chain KUBE-SERVICES (2 references)
target     prot opt source               destination
KUBE-SVC-NPX46M4PTMTKRN6Y  tcp  --  0.0.0.0/0            10.96.0.1            /* default/kubernetes:https cluster IP */ tcp dpt:443
KUBE-SVC-P4Q3KNUAWJVP4ILH  tcp  --  0.0.0.0/0            10.111.67.225        /* default/nginx:http cluster IP */ tcp dpt:80
KUBE-SVC-TCOU7JCQXEZGVUNU  udp  --  0.0.0.0/0            10.96.0.10           /* kube-system/kube-dns:dns cluster IP */ udp dpt:53
KUBE-SVC-ERIFXISQEP7F7OF4  tcp  --  0.0.0.0/0            10.96.0.10           /* kube-system/kube-dns:dns-tcp cluster IP */ tcp dpt:53
KUBE-SVC-JD5MR3NA4I4DYORP  tcp  --  0.0.0.0/0            10.96.0.10           /* kube-system/kube-dns:metrics cluster IP */ tcp dpt:9153
KUBE-NODEPORTS  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service nodeports; NOTE: this must be the last rule in this chain */ ADDRTYPE match dst-type LOCAL

这里前面的 KUBE-SVC-* 都是根据 destination, protocol 和目的端口号来匹配的,根据我们的 service 地址和端口号以及协议,可以定位到 KUBE-SVC-P4Q3KNUAWJVP4ILH 这条规则可以匹配,然后跳到这条链上。我们接着看这条链定义了什么:

$ iptables -t nat -nL KUBE-SVC-P4Q3KNUAWJVP4ILH
Chain KUBE-SVC-P4Q3KNUAWJVP4ILH (1 references)
target     prot opt source               destination
KUBE-SEP-GL7IUDQTUTXSADHR  all  --  0.0.0.0/0            0.0.0.0/0            /* default/nginx:http */ statistic mode random probability 0.50000000000
KUBE-SEP-VMO3WCKZND6ZICDD  all  --  0.0.0.0/0            0.0.0.0/0            /* default/nginx:http */

有两条规则,根据第一条规则后面的内容,我们可以知道这就是使用 iptables 实现负载均衡的地方了。第一条规则有 50% 的匹配几率。如果匹配到了其中一条,就会跳到另外一个链上。比如:

$ iptables -t nat -nL KUBE-SEP-GL7IUDQTUTXSADHR
Chain KUBE-SEP-GL7IUDQTUTXSADHR (1 references)
target     prot opt source               destination
KUBE-MARK-MASQ  all  --  172.17.0.4           0.0.0.0/0            /* default/nginx:http */
DNAT       tcp  --  0.0.0.0/0            0.0.0.0/0            /* default/nginx:http */ tcp to:172.17.0.4:80

其中第一条规则的 source 是 Pod 的 IP,在访问 Service 时目前还不会匹配,于是我们看第二条规则,将目的 IP 和 Port 改写成 172.17.0.4:80,也就是我们的 Pod IP,这样流量就经过负载均衡指向了我们的 Pod了。

NodePort

我们将上面的 Service 改成 NodePort

nginx        NodePort    10.111.67.225   <none>        80:30000/TCP   34h

然后查询机器上的 30000 端口。

$ ss -lp | grep 30000
tcp               LISTEN              0                    0                                                                                            0.0.0.0:30000                                                 0.0.0.0:*                  users:(("kube-proxy",pid=4006,fd=8))

可以看到, kube-proxy 监听了 30000 端口,同时我们看 nat 表上的 PREROUTING 链。

KUBE-SERVICES  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service portals */

再看 KUBE-SERVICES

KUBE-SVC-TCOU7JCQXEZGVUNU  udp  --  0.0.0.0/0            10.96.0.10           /* kube-system/kube-dns:dns cluster IP */ udp dpt:53
KUBE-SVC-ERIFXISQEP7F7OF4  tcp  --  0.0.0.0/0            10.96.0.10           /* kube-system/kube-dns:dns-tcp cluster IP */ tcp dpt:53
KUBE-SVC-JD5MR3NA4I4DYORP  tcp  --  0.0.0.0/0            10.96.0.10           /* kube-system/kube-dns:metrics cluster IP */ tcp dpt:9153
KUBE-SVC-NPX46M4PTMTKRN6Y  tcp  --  0.0.0.0/0            10.96.0.1            /* default/kubernetes:https cluster IP */ tcp dpt:443
KUBE-SVC-P4Q3KNUAWJVP4ILH  tcp  --  0.0.0.0/0            10.111.67.225        /* default/nginx:http cluster IP */ tcp dpt:80
KUBE-NODEPORTS  all  --  0.0.0.0/0            0.0.0.0/0            /* kubernetes service nodeports; NOTE: this must be the last rule in this chain */ ADDRTYPE match dst-type LOCAL

最后一条 KUBE-NODEPORTS 可以匹配到,这里有个匹配条件,那就是 ADDRTYPE match dst-type LOCAL。注意这里的 LOCAL 指的是本机网卡上存在的地址,也就是这条数据是发到本机,那么就能匹配。

KUBE-NODEPORTS 的规则如下:

KUBE-MARK-MASQ  tcp  --  0.0.0.0/0            0.0.0.0/0            /* default/nginx:http */ tcp dpt:30000
KUBE-SVC-P4Q3KNUAWJVP4ILH  tcp  --  0.0.0.0/0            0.0.0.0/0            /* default/nginx:http */ tcp dpt:30000

第一条规则是替换源地址为本机出口的网卡地址。第二条规则如下:

KUBE-SEP-F3MS6OIYSABTYGOY  all  --  0.0.0.0/0            0.0.0.0/0            /* default/nginx:http */ statistic mode random probability 0.50000000000
KUBE-SEP-VMO3WCKZND6ZICDD  all  --  0.0.0.0/0            0.0.0.0/0            /* default/nginx:http */

这里我们在 ClusterIP 中就分析了实现方法,因此这里忽略。

LoadBalancer

LoadBalancer 本身不是由 Kubernetes 提供的,其原理说起来也不难,我们先创建一个 LoadBalancer 的 Service 看看:

nginx        LoadBalancer   10.111.67.225   <pending>     80:32014/TCP   34h

这里因为我的本地集群没有 LoadBalancer,所以一直处于 Pending 状态。但是我们可以看到,这里还有一个 80:32014。和上面的 NodePort 输出一致。也就是说创建 LoadBalancer 时,会在 Pod 所在的机器上开启 NodePort,然后由外部的 LoadBalancer 将负载均衡过的流量带到机器的指定的 NodePort 上。

一些有意思的参数

这里顺便多提几个有意思的Service 参数

externalTrafficPolicy:可选值有 LocalCluster

  • Local: 流量只会被导向本机的 Pod,这样就少一次包的转发,提高性能。但是缺点是如果容易导致负载不均衡。
  • Cluster: 在集群范围内转发流量

如果能保证 Pod 均匀的分布在不同的节点上,那么外部的 LoadBalancer 配合 Local 的 externalTrafficPolicy 可以带来更好的性能。

sessionAffinity: 会话亲和性,可以设置为 ClientIP,来达到将同一个 IP 的会话转发到相同的 Pod 上。其也是通过 iptables 实现的。

KUBE-SEP-Q7ZFI57LOFFPF3HN  all  --  0.0.0.0/0            0.0.0.0/0            /* test/nginx-session-affinity:http */ recent: CHECK seconds: 10800 reap name: KUBE-SEP-Q7ZFI57LOFFPF3HN side: source mask: 255.255.255.255
KUBE-SEP-LWUZWBNY6M3CYJ2M  all  --  0.0.0.0/0            0.0.0.0/0            /* test/nginx-session-affinity:http */ recent: CHECK seconds: 10800 reap name: KUBE-SEP-LWUZWBNY6M3CYJ2M side: source mask: 255.255.255.255
KUBE-SEP-Q7ZFI57LOFFPF3HN  all  --  0.0.0.0/0            0.0.0.0/0            /* test/nginx-session-affinity:http */ statistic mode random probability 0.50000000000
KUBE-SEP-LWUZWBNY6M3CYJ2M  all  --  0.0.0.0/0            0.0.0.0/0            /* test/nginx-session-affinity:http */

这个 iptables 的前两条规则就是在做 iptables 的检查。

[Gaia Scheduler] gpu-manager 启动流程分析

概述

Gaia scheduler 是腾讯开源的在 Kubernetes 集群中做 GPU 虚拟化的方案,实现了为容器分配虚拟化 GPU 资源并加以限制,它的最大的优势就是不需要特殊的硬件支持,并且性能损耗很小。关于它的论文,地址在这里:Gaia Scheduler: A Kubernetes-Based Scheduler Framework。如果想要理解这个项目,强烈建议先读这篇论文。

Gaia Scheduler 可以分为 4 个组件:

  • GPU Manager: 作为 device plugin 向 kubelet 注册。共注册了两个设备,包括 vcore 和 vmemory,支持两种计算资源:tencent.com/vcuda-coretencent.com/vcuda-memory,分别用来做 GPU 计算资源和 GPU 内存资源的请求和限制。

  • GPU Scheduler: 这里的 scheduler 并不是 kubernetes 的调度器,是 GPU Manager 在收到 kubelet 的 Allocate 调用后,它需求将设备挂载给容器。为了实现最佳的 GPU 挂载,就有这样一个专门的 Scheduler 来根据节点上当前的 GPU 拓扑和资源占用情况进行调度。

  • vGPU Manager: vGPU Manager 是具体负责管理容器的组件,包括监控容器状态,传递配置,和容器内的vGPU Library通信,以及在容器死亡后进行回收操作。

  • vGPU Library: vGPU Library 虽然相关的代码量不多,但它是 Gaia Scheduler 最重要的部分。因为它是实现 GPU 虚拟化的核心。通过覆盖容器中的 LD_LIBRARY_PATH 以及自定义了 libcuda-control.so 实现对 CUDA API 的拦截。

Gaia Scheduler 主要由三个项目组成: gpu-managervcuda-controllergpu-admission。但是这里的 gpu-manager 是 Gaia Scheduler 的主要实现,包含了上述的 4 个组件,vcuda-controller 就是 vGPU Library,已经被打包到了 gpu-manager 这个项目中。gpu-manager 需要配合 gpu-admission 项目来完成 GPU Scheduler 的工作。不要因此产生误解。下文中我们主要就 gpu-manager 这个项目进行分析。

启动流程分析

gpu-manager 本身主要作为 kubernetes 的 device plugin 来实现的,定义了两种设备: vcuda-corevcuda-memory,我们的应用通过 pod 的资源字段进行申请,然后 kube-scheduler 会根据节点上的资源状态进行调度。因此,你最好还需要了解 kubernetes 的 device plugin 的开发知识。关于 device plugin 的开发,可以看之前的一篇文章:Kubernetes开发知识–device-plugin的实现

启动参数

分析一个项目从启动参数开始,可以帮助我们快速了解:

  • driver: 这个是 GPU 的驱动,当前的默认值是 nvidia,很显然该项目可以扩展支持其他类型的 GPU。
  • extra-config: 额外的配置,这个参数暂时看不出来有什么特别
  • volume-config: 这里的 volume 指的是一些动态链接库和可执行文件的位置。也就是 gpu-manager 需要拦截调用的一些库
  • docker-endpoint: 用来挂载到容器中和 docker 做通信的,默认位置是 unix:////var/run/docker.sock
  • query-port: 统计信息服务的查询接口
  • query-port: 统计信息服务的监听地址
  • kubeconfig: 用来授权的配置文件
  • standalone: 暂时还不清楚的参数
  • sample-period: gpu-manager 会查询 gpu 设备的使用情况,这个参数用来设定采样周期
  • node-labels: 给节点自动打标签
  • hostname-override: gpu-manager 在运行时,只关注自己节点上的 pod,这主要是通过 hostname 来辨认的
  • virtual-manager-path: gpu-manager 会为所有需要虚拟 gpu 资源的 pod 创建唯一的文件夹,文件夹的路径就在这个地址下。
  • device-plugin-path: kubernetes 默认的 device plugin 的目录地址
  • checkpoint-path: gpu-manager 会产生 checkpoint 来当缓存用
  • share-mode: gpu-manager 最大的特点就是将一个物理 gpu 分成多个虚拟 gpu,也就是共享模式
  • allocation-check-period: 检查分配了虚拟 gpu 资源的 pod 的状态,及时回收资源
  • incluster-mode: 是否在集群内运行

服务启动

gpu-manager 推荐的部署方案是通过 kubernetes 的 daemonset,然后配置 node selector 调度到指定的节点上。然后 gpu-manager 就开始在指定节点上启动了。

srv := server.NewManager(cfg)
go srv.Run()

这里,我们需要看一下这个 srv 的具体实现,首先是它的结构体:

type managerImpl struct {
    config *config.Config

    allocator      allocFactory.GPUTopoService     // gpu 容器调度分配
    displayer      *display.Display                // gpu 使用情况可视化服务
    virtualManager *vitrual_manager.VirtualManager // 负责管理 vgpu

    bundleServer map[string]ResourceServer
    srv          *grpc.Server
}

config 包含了我们上面的所有参数,就不进去细看了。

allocator 负责在容器调度到节点上后,为其分配具体的设备资源。allocator 实现了探测节点上的 gpu 拓扑架构,然后以最佳性能,最少碎片为目的使用最优的方案进行资源分配。

displayer 是将 gpu 的使用情况输出,方便我们查看。

virtualManager 负责 vgpu 分配后的管理工作。

bundleServer 包含 vcore,vmemory,我们上面提到这两种资源以 device plugin 的方式进行注册,因此他们需要启动 grpc server。

srv: 将 gpu display server 注册到这个 grpc server 中。

接下来,我们就可以分析 srv.Run() 方法具体执行了哪些内容。为了先对整个流程有个大概的印象,我将内容整理成以下条目:

  • 启动 volumeManager,将节点上和 nvidia gpu (包括cuda) 的所有可执行文件和库移动到 /etc/gpu-manager/vdriver 中。并且将关键的库替换成 vcuda-control,实现 cuda 调用的拦截。
  • watchdog 创建 pod 缓存并监控 pod,之后所有关于 pod 的操作都来源于这里。
  • watchdog 给节点打上标签
  • 启动 virtualManager
  • gpu 拓扑结构感知。
  • 初始化资源分配器
  • 设置 vcuda, vmemory, display 的 grpc 服务
  • 启动 metrics 的 http 服务,主要是提供给 prometheus
  • 启动 vcuda,vmemory 的 grpc 服务
  • 启动 display 的 grpc 服务

接下来,我们具体来分析每一步是如何做的。当然,这里只会挑一些重点的部分。

volumeManager 的启动

func (vm *VolumeManager) Run() (err error) {
    // ldcache 是动态链接库的缓存信息
    cache, err := ldcache.Open()
    defer func() {
        if e := cache.Close(); err == nil {
            err = e
        }
    }()
    vols := make(VolumeMap)
    for _, cfg := range vm.Config {
        vol := &Volume{
            Path: path.Join(cfg.BasePath, cfg.Name),
        }

        if cfg.Name == "nvidia" {
            // nvidia 库的位置
            types.DriverLibraryPath = filepath.Join(cfg.BasePath, cfg.Name)
        } else {
            // origin 库的位置
            types.DriverOriginLibraryPath = filepath.Join(cfg.BasePath, cfg.Name)
        }

        for t, c := range cfg.Components {
            switch t {
            case "binaries":
                // 调用 which 来查找可执行文件的位置
                bins, err := which(c...)
                // 将实际位置存起来
                vol.dirs = append(vol.dirs, volumeDir{binDir, bins})
            case "libraries":
                // 是库的话,就从 ldcache 里面去找
                libs32, libs64 := cache.Lookup(c...)
                // 将 library 位置存起来
                vol.dirs = append(vol.dirs, volumeDir{lib32Dir, libs32}, volumeDir{lib64Dir, libs64})
            }
            vols[cfg.Name] = vol
        }
    }
    // 找到了需要的库位置之后,做 mirror 处理
    if err := vm.mirror(vols); err != nil {
        return err
    }
    return nil
}

这段代码的前半部分都是在查找指定的动态链接库和可执行文件,这些文件是在 volume.conf 这个配置文件中指定的,通过参数传进来。查找动态链接库时,使用的是 ldcache,查找可执行文件时,使用了系统的 which 指令。找到之后会将其所在位置记录下来。接着就是对找到的库做 mirror 处理。

func (vm *VolumeManager) mirror(vols VolumeMap) error {
    // nvidia 和 origin
    for driver, vol := range vols {
        if exist, _ := vol.exist(); !exist {
            // 这里的path是/etc/gpu-manager/vdriver下面
            if err := os.MkdirAll(vol.Path, 0755); err != nil {
                return err
            }
        }
        for _, d := range vol.dirs {
            vpath := path.Join(vol.Path, d.name)
            // 创建 bin lib lib64
            if err := os.MkdirAll(vpath, 0755); err != nil {
                return err
            }

            // For each file matching the volume components (blacklist excluded), create a hardlink/copy
            // of it inside the volume directory. We also need to create soname symlinks similar to what
            // ldconfig does since our volume will only show up at runtime.
            for _, f := range d.files {
                glog.V(2).Infof("Mirror %s to %s", f, vpath)
                if err := vm.mirrorFiles(driver, vpath, f); err != nil {
                    return err
                }

                if strings.HasPrefix(path.Base(f), "libcuda.so") {
                    driverStr := strings.SplitN(strings.TrimPrefix(path.Base(f), "libcuda.so."), ".", 2)
                    types.DriverVersionMajor, _ = strconv.Atoi(driverStr[0]) // 驱动版本号
                    types.DriverVersionMinor, _ = strconv.Atoi(driverStr[1])
                    glog.V(2).Infof("Driver version: %d.%d", types.DriverVersionMajor, types.DriverVersionMinor)
                }

                if strings.HasPrefix(path.Base(f), "libcuda-control.so") {
                    vm.cudaControlFile = f
                }
            }
        }
    }

    vCudaFileFn := func(soFile string) error {
        if err := os.Remove(soFile); err != nil {
            if !os.IsNotExist(err) {
                return err
            }
        }
        if err := clone(vm.cudaControlFile, soFile); err != nil {
            return err
        }

        glog.V(2).Infof("Vcuda %s to %s", vm.cudaControlFile, soFile)

        l := strings.TrimRight(soFile, ".0123456789")
        if err := os.Remove(l); err != nil {
            if !os.IsNotExist(err) {
                return err
            }
        }
        if err := clone(vm.cudaControlFile, l); err != nil {
            return err
        }
        glog.V(2).Infof("Vcuda %s to %s", vm.cudaControlFile, l)
        return nil
    }

    if vm.share && len(vm.cudaControlFile) > 0 {
        if len(vm.cudaSoname) > 0 {
            for _, f := range vm.cudaSoname {
                if err := vCudaFileFn(f); err != nil {
                    return err
                }
            }
        }

        if len(vm.mlSoName) > 0 {
            for _, f := range vm.mlSoName {
                if err := vCudaFileFn(f); err != nil {
                    return err
                }
            }
        }
    }

    return nil
}

这段代码先会对所有上面查找到的库或可执行文件调用 mirrorFiles,但是记录下来了 libcuda.so 的版本号和 libcuda-control.so 的位置。注意,这个 libcuda-control 就是 vcuda-control 项目生成的用来拦截 cuda 调用的库。

然后将 cudaControlFile clone到所有 cudaSonamemlSoName 中库的位置。这个 clone 方法会先尝试硬链接过去,如果失败就直接复制过去。这里的 cudaControlFile 就是我们上面所说的 libcuda-control.so 啦。cudaSonamemlSoName 包含了所有需要被拦截调用的库。这样子就实现了拦截所有的 cuda 调用。下面我们在看一下 mirrorFiles 这个方法就可以了。

// driver 是配置文件中的 "nvidia" 或 "origin"
// vpath 是要 mirror 到的位置,在 /etc/gpu-manager/vdriver 下面
func (vm *VolumeManager) mirrorFiles(driver, vpath string, file string) error {
    // In computing, the Executable and Linkable Format (ELF, formerly named Extensible Linking Format), is a common standard file format for executable files, object code, shared libraries, and core dumps
    obj, err := elf.Open(file)
    defer obj.Close()

    // 黑名单机制,具体用处还不清楚,跟 nvidia 的驱动相关
    ok, err := blacklisted(file, obj)
    if ok {
        return nil
    }
    l := path.Join(vpath, path.Base(file))
    // 不管有没有,先尝试把 gpu-manager 里面的移除
    if err := removeFile(l); err != nil {
        return err
    }
    // clone 优先硬连接,其次是复制文件到指定位置
    if err := clone(file, l); err != nil {
        return err
    }
    // 从 elf 中获取当前库的 soname
    soname, err := obj.DynString(elf.DT_SONAME)
    if len(soname) > 0 {
        // 将获取到 soname 组成路径
        l = path.Join(vpath, soname[0])
        // 如果文件和它的soname不一致(是否可以认为这个文件是软链接过去的)
        if err := linkIfNotSameName(path.Base(file), l); err != nil && !os.IsExist(err) {
            return err
        }

        // XXX Many applications (wrongly) assume that libcuda.so exists (e.g. with dlopen)
        // Hardcode the libcuda symlink for the time being.
        if strings.Contains(driver, "nvidia") {
            // 这里为什么要移除 libcuda.so 和 libnvidia-ml.so 的软链接
            // 因为gpu调用会涉及到这两个库,这两个库会软链接到真实的库上。移除后替换成拦截的库
            // Remove libcuda symbol link
            if vm.share && driver == "nvidia" && strings.HasPrefix(soname[0], "libcuda.so") {
                os.Remove(l)
                vm.cudaSoname[l] = l
            }

            // Remove libnvidia-ml symbol link
            if vm.share && driver == "nvidia" && strings.HasPrefix(soname[0], "libnvidia-ml.so") {
                os.Remove(l)
                vm.mlSoName[l] = l
            }

            // XXX GLVND requires this symlink for indirect GLX support
            // It won't be needed once we have an indirect GLX vendor neutral library.
            if strings.HasPrefix(soname[0], "libGLX_nvidia") {
                l = strings.Replace(l, "GLX_nvidia", "GLX_indirect", 1)
                if err := linkIfNotSameName(path.Base(file), l); err != nil && !os.IsExist(err) {
                    return err
                }
            }
        }
    }

    return nil
}

这段代码中,先使用 blacklisted 排除一些不需要处理的库,然后尝试将库或可执行文件 clone 到我们的 /etc/gpu-manager/vdriver 下面。/etc/gpu-manager/vdriver 下面有两个文件夹,一个是 nvidia,保存了已经被我们拦截的库,一个是 origin,这里面是原始的未处理的库。同时,还将 libcuda.so 和 libnvidia-ml.so 移除了,这样就调用不到真实的库了,转而在之后用我们拦截的库来替换这几个文件。

至此,volumeManager 分析结束。

gpu 拓扑结构感知

关于 gpu 拓扑结构这一块,主要是为了在之后做资源分配时选择最优方案用的。腾讯也有分享过这一块的资料(腾讯基于 Kubernetes 的企业级容器云实践):

gpu 拓扑结构

这里不影响我们理解整个工作机制,所以先不分析。

初始化资源分配器

// 分配器,根据driver调用相应的分配器
initAllocator := allocFactory.NewFuncForName(m.config.Driver)
if initAllocator == nil {
    return fmt.Errorf("can not find allocator for %s", m.config.Driver)
}

m.allocator = initAllocator(m.config, tree, client)

这里的 initAllocator 对应的方法是:

//NewNvidiaTopoAllocator returns a new NvidiaTopoAllocator
func NewNvidiaTopoAllocator(config *config.Config, tree device.GPUTree, k8sClient kubernetes.Interface) allocator.GPUTopoService {
    runtimeRequestTimeout := metav1.Duration{Duration: 2 * time.Minute}
    imagePullProgressDeadline := metav1.Duration{Duration: 1 * time.Minute}
    dockerClientConfig := &dockershim.ClientConfig{
        DockerEndpoint:            config.DockerEndpoint,
        RuntimeRequestTimeout:     runtimeRequestTimeout.Duration,
        ImagePullProgressDeadline: imagePullProgressDeadline.Duration,
    }

    _tree, _ := tree.(*nvtree.NvidiaTree)
    cm, err := checkpoint.NewManager(config.CheckpointPath, checkpointFileName)
    if err != nil {
        glog.Fatalf("Failed to create checkpoint manager due to %s", err.Error())
    }
    alloc := &NvidiaTopoAllocator{
        tree:              _tree,
        config:            config,
        evaluators:        make(map[string]Evaluator),
        dockerClient:      dockershim.NewDockerClientFromConfig(dockerClientConfig),
        allocatedPod:      cache.NewAllocateCache(),
        k8sClient:         k8sClient,
        queue:             workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
        stopChan:          make(chan struct{}),
        checkpointManager: cm,
    }

    // Load kernel module if it's not loaded
    alloc.loadModule()

    // Initialize evaluator
    alloc.initEvaluator(_tree)

    // Read extra config if it's given
    alloc.loadExtraConfig(config.ExtraConfigPath)

    // Process allocation results in another goroutine
    go wait.Until(alloc.runProcessResult, time.Second, alloc.stopChan)

    // Recover
    alloc.recoverInUsed()

    // Check allocation in another goroutine periodically
    go alloc.checkAllocationPeriodically(alloc.stopChan)

    return alloc
}

allocator 调用 loadModule() 来启用 nvidia 的内核模块。

调用 initEvaluator(_tree) 来初始化评估器,这里的 _tree 就是感知到的 gpu 拓扑结构。

调用 loadExtraConfig(config.ExtraConfigPath) 来加载启动时传入的额外参数配置文件。

go wait.Until(alloc.runProcessResult, time.Second, alloc.stopChan) 创建了新的协程来处理分配结果。

recoverInUsed() 是恢复 gpu 分配结果。比如在 gpu-manager 重启之后,之前的 gpu 分配结果都丢失了,但是节点上还有大量的容器正在占用 gpu,这个方法会通过查找节点上存活的容器,通过 docker endpoint, 调用 InspectContainer 获取容器中占用的 device id,然后标记该设备和容器之间的占用关系。

go alloc.checkAllocationPeriodically(alloc.stopChan) 创建新的协程来周期性的检查资源分配情况。如果是 Failed 和 Pending 状态的容器,就根据错误信息检查是否应该删除它们,然后如果这些 pod 的控制器是 deployment 类似的,就尝试删除它们,这样控制器会重新创建这些 pod 进行调度,让这些 pod 恢复到正常运行状态。

启动各种服务

vcuda,vmemory 的 grpc 服务是 device plugin 的机制。metrics service 是提供给 prometheus 调用的,以监控该节点的相关信息。display 服务会打印 gpu 拓扑结构的相关信息。

Device plugin 的注册

Device plugin

这张图是 device plugin 注册的时序图。gpu-manager 的注册方法是:

func (m *managerImpl) RegisterToKubelet() error {
    socketFile := filepath.Join(m.config.DevicePluginPath, types.KubeletSocket)
    dialOptions := []grpc.DialOption{grpc.WithInsecure(), grpc.WithDialer(utils.UnixDial), grpc.WithBlock(), grpc.WithTimeout(time.Second * 5)}

    conn, err := grpc.Dial(socketFile, dialOptions...)
    if err != nil {
        return err
    }
    defer conn.Close()

    client := pluginapi.NewRegistrationClient(conn)

    for _, srv := range m.bundleServer {
        req := &pluginapi.RegisterRequest{
            Version:      pluginapi.Version,
            Endpoint:     path.Base(srv.SocketName()),
            ResourceName: srv.ResourceName(),
            Options:      &pluginapi.DevicePluginOptions{PreStartRequired: true},
        }

        glog.V(2).Infof("Register to kubelet with endpoint %s", req.Endpoint)
        _, err = client.Register(context.Background(), req)
        if err != nil {
            return err
        }
    }

    return nil
}

这里分别注册了 vcuda 和 vmemory。vcuda 和 vmemory 的 Allocate 方法都指向了同一个方法,写在了 service/allocator/nvidia/allocator.go 中。

至此,gpu-manager 的启动流程结束。接下来的 gpu-manager 的职责就是等待 kubelet 通过 grpc 的调用,在容器调度到节点的时候进行资源设备的分配,必要目录的挂载等工作了。具体的可以见下一篇文章

最后,提供一个简单的脑图帮助理解:

gpu-manager-arch

kubernetes存储–FlexVolume

简介

kubernetes 使用 volume 来满足它的存储需求,它支持很多的存储系统,比如 nfs、 glusterfs、cephfs等等,但是这些存储的实现方式有一个问题,就是它们的实现代码都必须合并到 Kubernetes 的代码中(称为 in-tree),这为 kubernetes 社区带来了维护上的成本。因此,kubernetes 提出了两种 out-of-tree 的方案: FlexVolume 和 csi。通过这两种方案实现的存储功能不必合并到 kubernetes 的代码仓库,由存储系统的供应商单独维护。

FlexVolume 是这篇文章主要关注的点,FlexVolume 自 1.2 版本开始就支持了。它使用基于 exec 的模型来与驱动程序对接。用户必须在每个节点(有些情况下包括主节点)上的预定义卷插件路径中安装 FlexVolume 驱动程序的可执行文件。当需要挂载 volume 的时候,由 kubelet 执行挂载命令来挂载即可。

基于 nfs 实现 FlexVolume

在探究 FlexVolume 的实现原理之前,我们可以先看一下官方提供的基于 nfs 的例子

注: 我这里是用 minikube 启动的本地 kubernetes 集群。

为了部署基于 nfs 实现的 FlexVolume,我们首先将目录下的 nfs 复制到 deploy 文件夹下

$ cp nfs deploy

然后将 deploy/deploy.sh 中的 dummy 修改成 nfs,表示我们使用的插件脚本是 nfs 这个可执行文件。

接着在 deploy 文件夹下构建 docker 镜像,这里要修改 Dockerfile,将 nfs COPY 到镜像中。然后执行下面的命令(镜像标签需要修改成你自己的):

$ docker build -t joyme/nfs-flexvolume:1.0 .
$ docker push joyme/nfs-flexvolume:1.0

镜像构建并推送完成之后,我们就开始部署了。因为 FlexVolume 要求将驱动文件放在指定的目录下,最粗暴的方式就是手动将文件 scp 到集群的每个节点上。这里为了方便,我们还可以使用 kubernetes 的 Daemenset,然后使用 hostPath 将文件放到主机之上。我们修改 deploy 文件夹下的 ds.yaml 这个部署文件。将我们刚刚推送的镜像填进去。然后执行以下命令进行部署。

$ kubectl apply -f ds.yaml

这里有个地方要注意, 默认的插件安装地址是 /usr/libexec/kubernetes/kubelet-plugins/volume/exec/, 但是 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 都可以修改这个值,如果你启动这些组件是指定了这些参数,那就需要修改 ds.yaml 中的路径。

在集群中部署完成之后,我们可以到某个节点上检查一下/usr/libexec/kubernetes/kubelet-plugins/volume/exec/是否存在我们刚刚部署的文件。

最后我们创建一个 nginx,挂载一个 FlexVolume。在创建之前,我们需要先启动一个 nfs server,这里为了方便,可以使用容器启动一个。

$ docker run -d --privileged --restart=always \
-v /tmp:/dws_nas_scratch \
-e NFS_EXPORT_DIR_1=/dws_nas_scratch \
-e NFS_EXPORT_DOMAIN_1=\* \
-e NFS_EXPORT_OPTIONS_1=ro,insecure,no_subtree_check,no_root_squash,fsid=1 \
-p 111:111 -p 111:111/udp \
-p 2049:2049 -p 2049:2049/udp \
-p 32765:32765 -p 32765:32765/udp \
-p 32766:32766 -p 32766:32766/udp \
-p 32767:32767 -p 32767:32767/udp \
fuzzle/docker-nfs-server:latest

使用官方提供的 nginx-nfs.yaml 文件,然后把其中的 server 地址修改一下,使用以下命令创建:

$ kubectl apply -f nginx-nfs.yaml

注意:如果出现错误,可以检查 node 上是否安装了 jq, nfs-common 等必要的依赖包。

实现原理

在完成上面例子的过程中,关于 FlexVolume 的大多数问题都比较好解答了。我们来看一下 nfs 的实现代码:

usage() {
    err "Invalid usage. Usage: "
    err "\t$0 init"
    err "\t$0 mount <mount dir> <json params>"
    err "\t$0 unmount <mount dir>"
    exit 1
}

err() {
    echo -ne $* 1>&2
}

log() {
    echo -ne $* >&1
}

ismounted() {
    MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1`
    if [ "${MOUNT}" == "${MNTPATH}" ]; then
        echo "1"
    else
        echo "0"
    fi
}

domount() {
    MNTPATH=$1

    NFS_SERVER=$(echo $2 | jq -r '.server')
    SHARE=$(echo $2 | jq -r '.share')

    if [ $(ismounted) -eq 1 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    mkdir -p ${MNTPATH} &> /dev/null

    mount -t nfs ${NFS_SERVER}:/${SHARE} ${MNTPATH} &> /dev/null
    if [ $? -ne 0 ]; then
        err "{ \"status\": \"Failure\", \"message\": \"Failed to mount ${NFS_SERVER}:${SHARE} at ${MNTPATH}\"}"
        exit 1
    fi
    log '{"status": "Success"}'
    exit 0
}

unmount() {
    MNTPATH=$1
    if [ $(ismounted) -eq 0 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    umount ${MNTPATH} &> /dev/null
    if [ $? -ne 0 ]; then
        err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"
        exit 1
    fi

    log '{"status": "Success"}'
    exit 0
}

op=$1

if ! command -v jq >/dev/null 2>&1; then
    err "{ \"status\": \"Failure\", \"message\": \"'jq' binary not found. Please install jq package before using this driver\"}"
    exit 1
fi

if [ "$op" = "init" ]; then
    log '{"status": "Success", "capabilities": {"attach": false}}'
    exit 0
fi

if [ $# -lt 2 ]; then
    usage
fi

shift

case "$op" in
    mount)
        domount $*
        ;;
    unmount)
        unmount $*
        ;;
    *)
        log '{"status": "Not supported"}'
        exit 0
esac

exit 1

其实就是一段 shell 脚本,支持三个命令: init、mount、unmount。当我们在集群中为某个 pod 挂载 FlexVolume时,该 pod 所在节点的 kubelet 会调用其指定的插件脚本执行 mount 命令,然后挂载给 pod 使用。当然了,FlexVolume 还支持更复杂的插件。这个可以看官方的文档: flexvolume

部署方案

关于如何部署 FlexVolume 的插件,其实在例子中也有提到,这里可以总结一下:

  • 手动部署到每个节点的指定目录下,比如我们刚刚部署的 nfs ,其实际路径是: /usr/libexec/kubernetes/kubelet-plugins/volume/exec/k8s~nfs。其中 /usr/libexec/kubernetes/kubelet-plugins/volume/exec 是默认路径,也可以通过 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 来指定。k8s~nfs 这个路径中,k8s 是供应商, nfs 是驱动名称,在使用的时候可以这样指定: `driver: “k8s/nfs”。

  • 使用 kubernetes 的 deamonset 配合 hostPath 来部署,因为 daemonset 会在每个节点上都启动 pod,然后通过 hostPath 将插件放在指定的位置即可。kubernetes 集群中 master 节点可能被设置成不允许调度。这种情况下 daemonset 默认不调度到 master 节点上,可以使用 tolerations 来解决这个问题. 具体可参考: Scheduler is not scheduling Pod for DaemonSet in Master node

  • 其实除了 kubelet 要调用插件之外,controller-manager 也要调用。比如执行 init, attach, detach, waitforattach, isattached 等命令。

argo的输入输出源代码分析

简介

argo是一个工作流的调度引擎,支持 Steps 和 DAG 这两种工作流。

  • Steps: 是按照步骤,从前往后的工作流调度方案。工作流中的每一步都只依赖上一步的结果
  • DAG: 全称是 directed acyclic graph,译为有向无环图。与 Steps 的区别在于每一步可能依赖之前的多步输出,但是不会循环依赖(也就是无环)

不论是在什么类型的工作流上,argo都抽象出了两种输入输出:

  • parameters: 通常情况下都是字符串,该字符串可以来源于标准输出,也可以来源于文件的内容
  • artifacts: 可以理解成文件

输入输出是连接整个工作流的核心。每一步都可以看作是一次函数调用。那么在argo中,它是如何实现在多步之间输入输出的传输呢?下面会通过源代码进行分析。

在看代码之前,可以看一个 argo 的工作流中的一个pod,为了查看更方便,我删除一些不需要关注的字段:

$ kubectl -n workflow describe pods custom-workflow-111-2fw2f-2639432629

Name:           custom-workflow-111-2fw2f-2639432629
Namespace:      workflow
Labels:         pipeline.starx.com/nodeID=743
                workflows.argoproj.io/completed=true
                workflows.argoproj.io/workflow=custom-workflow-111-2fw2f
Annotations:    cni.projectcalico.org/podIP: 10.42.0.83/32
                workflows.argoproj.io/node-name: custom-workflow-111-2fw2f.yolov3-evaluate-743
                workflows.argoproj.io/outputs:
                  {"result":...
                workflows.argoproj.io/template:
                  {"name":"yolov3-evaluate-743","inputs":{"parameters":[{"name":"userParam","value":"eyJTY29yZVRocmVzaG9sZCI6MC41LCJJb3VfVGhyZXNob2xkIjowLjQ...
Controlled By:  Workflow/custom-workflow-111-2fw2f
Init Containers:
  init:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      init
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/inputs/artifacts from input-artifacts (rw)
      /argo/podmetadata from podmetadata (rw)
      /argo/staging from argo-staging (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
Containers:
  wait:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      wait
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/podmetadata from podmetadata (rw)
      /mainctrfs/argo/staging from argo-staging (rw)
      /mainctrfs/tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /mainctrfs/tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
      /var/run/docker.sock from docker-sock (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
  main:
    Image:         registry.cn-shanghai.aliyuncs.com/xinhuodev/wt:0.4
    Command:
      sh
    Args:
      /argo/staging/script
    Mounts:
      /argo/staging from argo-staging (rw)
      /tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
Volumes:
  podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations
  docker-sock:
    Type:          HostPath (bare host directory volume)
    Path:          /var/run/docker.sock
    HostPathType:  Socket
  input-artifacts:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  argo-staging:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  default-token-lfk5b:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-lfk5b
    Optional:    false

我们需要关注的信息有:

  • Pod 的 Annotations
  • Init Containers 启动的初始化容器
  • Containers 中的 wait 容器和 main 容器
  • Pod 的 Volumes 和每个容器的 Mounts

Init 容器

argo 创建的 Pod 的初始化容器执行了 argoexec init 命令,从名字上可以猜测出,这个容器负责初始化 Pod 中的环境,比如获取来上一步的输入等等,对应的代码是 cmd/argoexec/commands/init.go, 我们的分析也从这里开始。在执行 argo exec init之后,第一个调用的函数应该是loadArtifacts()。这个方法中做了三件事: initExecutor()wfExecutor.StageFiles()wfExecutor.LoadArtifacts()

initExecutor:

initExecutor 的代码如下(删除了不重要的代码):

func initExecutor() *executor.WorkflowExecutor {
    tmpl, err := executor.LoadTemplate(podAnnotationsPath)

    var cre executor.ContainerRuntimeExecutor
    switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
    case common.ContainerRuntimeExecutorK8sAPI:
        cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
    case common.ContainerRuntimeExecutorKubelet:
        cre, err = kubelet.NewKubeletExecutor()
    case common.ContainerRuntimeExecutorPNS:
        cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
    default:
        cre, err = docker.NewDockerExecutor()
    }

    wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
    yamlBytes, _ := json.Marshal(&wfExecutor.Template)
    return &wfExecutor
}

podAnnotationsPath加载模板,这个模板其实就是 Argo 中单步的执行模板,默认情况下它的值是 /argo/podmetadata/annotations,这正好是 init 容器的挂载,而这个挂载对应的卷是:

 podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations

这里的 DownwardAPI 也解释一下,它是一种 volume 的类型,可以将 Pod 和 Container 的字段通过挂载文件的方式提供给容器内的进程方案。那么这里就是将 Pod 的 Annotations 字段通过上面的路径提供给 init 容器,init 容器根据其中的 template 获取该 Pod 的输入输出。

接下来判断根据容器运行时进行判断,这里我们只考虑 docker 作为容器运行时的情况。最后调用NewExecutor实例化了一个 wfExecutor

StageFiles()

源代码如下:

func (we *WorkflowExecutor) StageFiles() error {
    var filePath string
    var body []byte
    switch we.Template.GetType() {
    case wfv1.TemplateTypeScript:
        log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
        filePath = common.ExecutorScriptSourcePath
        body = []byte(we.Template.Script.Source)
    case wfv1.TemplateTypeResource:
        log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
        filePath = common.ExecutorResourceManifestPath
        body = []byte(we.Template.Resource.Manifest)
    default:
        return nil
    }
    err := ioutil.WriteFile(filePath, body, 0644)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return nil
}

职责很简单,根据 template 的类型,写入到不同的文件中,比如 script 就写入到 /argo/staging/script。这就是我们在 main 容器中执行的脚本了。

LoadArtifacts

// LoadArtifacts loads artifacts from location to a container path
func (we *WorkflowExecutor) LoadArtifacts() error {
    for _, art := range we.Template.Inputs.Artifacts {
        artDriver, err := we.InitDriver(art)

        var artPath string
        mnt := common.FindOverlappingVolume(&we.Template, art.Path)
        if mnt == nil {
            artPath = path.Join(common.ExecutorArtifactBaseDir, art.Name)
        } else {
            // If we get here, it means the input artifact path overlaps with an user specified
            // volumeMount in the container. Because we also implement input artifacts as volume
            // mounts, we need to load the artifact into the user specified volume mount,
            // as opposed to the `input-artifacts` volume that is an implementation detail
            // unbeknownst to the user.
            log.Infof("Specified artifact path %s overlaps with volume mount at %s. Extracting to volume mount", art.Path, mnt.MountPath)
            artPath = path.Join(common.ExecutorMainFilesystemDir, art.Path)
        }

        // The artifact is downloaded to a temporary location, after which we determine if
        // the file is a tarball or not. If it is, it is first extracted then renamed to
        // the desired location. If not, it is simply renamed to the location.
        tempArtPath := artPath + ".tmp"
        err = artDriver.Load(&art, tempArtPath)
        if err != nil {
            return err
        }
        if isTarball(tempArtPath) {
            err = untar(tempArtPath, artPath)
            _ = os.Remove(tempArtPath)
        } else {
            err = os.Rename(tempArtPath, artPath)
        }

        if art.Mode != nil {
            err = os.Chmod(artPath, os.FileMode(*art.Mode))
        }
    }
    return nil
}

InitDriver是初始化 Artifacts 的驱动。Argo 支持多种类型的存储系统,在 v2.3.0 这个版本支持: s3, http, git, artifactory, hdfs, raw。

FindOverlappingVolume 是检查 artifacts 的路径和用户挂载的路径是否有重合。如果有,则返回深度最深的路径,如果没有,则返回 nil。如果返回 nil, 则使用 /argo/inputs/artifacts 作为 artifacts 的基础路径。否则使用 /mainctrfs 作为路径。

下面就是下载文件,解压文件并修改权限了。

注意在这里,init、wait和main容器都挂载了input-artifactsargo-staging,并且 init 将输入和script放在了这两个卷中,所以其他几个卷都可以共享这些文件。

wait 容器

wait容器的职责有以下几点:

  • 等待 main 容器结束
  • 杀死 sidecar
  • 保存日志
  • 保存 parameters
  • 保存 artifacts
  • 获取脚本的输出流
  • 将输出放在 Annotations 上

下面我们看这些功能点的实现:

等待 main 容器结束

// Wait is the sidecar container logic which waits for the main container to complete.
// Also monitors for updates in the pod annotations which may change (e.g. terminate)
// Upon completion, kills any sidecars after it finishes.
func (we *WorkflowExecutor) Wait() error {
    // WaitInit() 是初始化操作,只有 PSN 需要
    err := we.RuntimeExecutor.WaitInit()
    if err != nil {
        return err
    }
    log.Infof("Waiting on main container")
    // waitMainContainerStart的主要原理是周期轮询Pod中的所有容器,检查main容器的ContainerID字段
    // 不为空说明启动了
    mainContainerID, err := we.waitMainContainerStart()
    if err != nil {
        return err
    }
    log.Infof("main container started with container ID: %s", mainContainerID)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // monitorAnnotations是因为pod的annotations会更改
    annotationUpdatesCh := we.monitorAnnotations(ctx)
    // 超时会杀死
    go we.monitorDeadline(ctx, annotationUpdatesCh)

    // 这里是直接用ContainerRuntime去等待容器结束的,比如docker,直接调用docker wait
    err = we.RuntimeExecutor.Wait(mainContainerID)
    if err != nil {
        return err
    }
    log.Infof("Main container completed")
    return nil
}

杀死 sidecar

main 容器运行结束后,wait 容器会负责杀死其他容器(这个让我发现了之前用 sidecar 做 main 容器运行结束后的清理工作一直无效的原因)。

// KillSidecars kills any sidecars to the main container
func (we *WorkflowExecutor) KillSidecars() error {
    if len(we.Template.Sidecars) == 0 {
        log.Infof("No sidecars")
        return nil
    }
    log.Infof("Killing sidecars")
    pod, err := we.getPod()
    if err != nil {
        return err
    }
    sidecarIDs := make([]string, 0)
    // 遍历pod中的容器,排除main和wait,然后调用runtime来杀死容器
    for _, ctrStatus := range pod.Status.ContainerStatuses {
        if ctrStatus.Name == common.MainContainerName || ctrStatus.Name == common.WaitContainerName {
            continue
        }
        if ctrStatus.State.Terminated != nil {
            continue
        }
        containerID := containerID(ctrStatus.ContainerID)
        log.Infof("Killing sidecar %s (%s)", ctrStatus.Name, containerID)
        sidecarIDs = append(sidecarIDs, containerID)
    }
    if len(sidecarIDs) == 0 {
        return nil
    }
    return we.RuntimeExecutor.Kill(sidecarIDs)
}

保存日志

argo 是支持将 main 容器中的日志持久化并保存到指定的地方的(s3, hdfs, Artifactory)。这在 argo 的文档上好像没有提到过。这一部分的逻辑比较简单,就是通过 ContainerRuntime 获取获取容器中的输出流,然后存成文件,通过 argo 中的 storage driver 保存下来。

保存 parameters

// SaveParameters will save the content in the specified file path as output parameter value
func (we *WorkflowExecutor) SaveParameters() error {
    if len(we.Template.Outputs.Parameters) == 0 {
        log.Infof("No output parameters")
        return nil
    }
    log.Infof("Saving output parameters")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    // 遍历模板参数
    for i, param := range we.Template.Outputs.Parameters {
        log.Infof("Saving path output parameter: %s", param.Name)
        // Determine the file path of where to find the parameter
        if param.ValueFrom == nil || param.ValueFrom.Path == "" {
            continue
        }

        var output string
        if we.isBaseImagePath(param.ValueFrom.Path) {
            log.Infof("Copying %s from base image layer", param.ValueFrom.Path)
            // 容器内,通过 runtime 获取
            output, err = we.RuntimeExecutor.GetFileContents(mainCtrID, param.ValueFrom.Path)
            if err != nil {
                return err
            }
        } else {
            log.Infof("Copying %s from from volume mount", param.ValueFrom.Path)
            mountedPath := filepath.Join(common.ExecutorMainFilesystemDir, param.ValueFrom.Path)
            // 容器的挂载卷,直接获取
            out, err := ioutil.ReadFile(mountedPath)
            if err != nil {
                return err
            }
            output = string(out)
        }

        outputLen := len(output)
        // Trims off a single newline for user convenience
        if outputLen > 0 && output[outputLen-1] == '\n' {
            output = output[0 : outputLen-1]
        }
        // 保存下来
        we.Template.Outputs.Parameters[i].Value = &output
        log.Infof("Successfully saved output parameter: %s", param.Name)
    }
    return nil
}

保存 artifacts

保存 artifacts 和 保存 parameters 的操作是一样的。

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts() error {
    if len(we.Template.Outputs.Artifacts) == 0 {
        log.Infof("No output artifacts")
        return nil
    }
    log.Infof("Saving output artifacts")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    err = os.MkdirAll(tempOutArtDir, os.ModePerm)
    if err != nil {
        return errors.InternalWrapError(err)
    }

    for i, art := range we.Template.Outputs.Artifacts {
        err := we.saveArtifact(mainCtrID, &art)
        if err != nil {
            return err
        }
        we.Template.Outputs.Artifacts[i] = art
    }
    return nil
}

获取脚本的输出流

直接调用 runtime 去获取 main 容器的输出流,然后保存到 template.outputs 中

func (we *WorkflowExecutor) CaptureScriptResult() error {
    if we.Template.Script == nil {
        return nil
    }
    log.Infof("Capturing script output")
    mainContainerID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }
    reader, err := we.RuntimeExecutor.GetOutputStream(mainContainerID, false)
    if err != nil {
        return err
    }
    defer func() { _ = reader.Close() }()
    bytes, err := ioutil.ReadAll(reader)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    out := string(bytes)
    // Trims off a single newline for user convenience
    outputLen := len(out)
    if outputLen > 0 && out[outputLen-1] == '\n' {
        out = out[0 : outputLen-1]
    }
    we.Template.Outputs.Result = &out
    return nil
}

将输出放在 Annotations 上

将 outputs 存在 pod 的 annotations 上。

func (we *WorkflowExecutor) AnnotateOutputs(logArt *wfv1.Artifact) error {
    outputs := we.Template.Outputs.DeepCopy()
    if logArt != nil {
        outputs.Artifacts = append(outputs.Artifacts, *logArt)
    }

    if !outputs.HasOutputs() {
        return nil
    }
    log.Infof("Annotating pod with output")
    outputBytes, err := json.Marshal(outputs)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return we.AddAnnotation(common.AnnotationKeyOutputs, string(outputBytes))
}

总结

init 容器做了 pod 的初始化,包括存储 script,下载 artifacts等等,这样我们的 main 容器就不用关心输入的来源,只需要在指定地方使用即可。wait 容器负责监控 main 容器的生命周期,在 main 容器中的主要逻辑运行结束之后,负责将输出部分读取,持久化,这样 main 容器就不用操心如何将该步产生的结果传到后面的步骤上的问题。