[Gaia Scheduler] gpu-manager 启动流程分析

概述

Gaia scheduler 是腾讯开源的在 Kubernetes 集群中做 GPU 虚拟化的方案,实现了为容器分配虚拟化 GPU 资源并加以限制,它的最大的优势就是不需要特殊的硬件支持,并且性能损耗很小。关于它的论文,地址在这里:Gaia Scheduler: A Kubernetes-Based Scheduler Framework。如果想要理解这个项目,强烈建议先读这篇论文。

Gaia Scheduler 可以分为 4 个组件:

  • GPU Manager: 作为 device plugin 向 kubelet 注册。共注册了两个设备,包括 vcore 和 vmemory,支持两种计算资源:tencent.com/vcuda-coretencent.com/vcuda-memory,分别用来做 GPU 计算资源和 GPU 内存资源的请求和限制。

  • GPU Scheduler: 这里的 scheduler 并不是 kubernetes 的调度器,是 GPU Manager 在收到 kubelet 的 Allocate 调用后,它需求将设备挂载给容器。为了实现最佳的 GPU 挂载,就有这样一个专门的 Scheduler 来根据节点上当前的 GPU 拓扑和资源占用情况进行调度。

  • vGPU Manager: vGPU Manager 是具体负责管理容器的组件,包括监控容器状态,传递配置,和容器内的vGPU Library通信,以及在容器死亡后进行回收操作。

  • vGPU Library: vGPU Library 虽然相关的代码量不多,但它是 Gaia Scheduler 最重要的部分。因为它是实现 GPU 虚拟化的核心。通过覆盖容器中的 LD_LIBRARY_PATH 以及自定义了 libcuda-control.so 实现对 CUDA API 的拦截。

Gaia Scheduler 主要由三个项目组成: gpu-managervcuda-controllergpu-admission。但是这里的 gpu-manager 是 Gaia Scheduler 的主要实现,包含了上述的 4 个组件,vcuda-controller 就是 vGPU Library,已经被打包到了 gpu-manager 这个项目中。gpu-manager 需要配合 gpu-admission 项目来完成 GPU Scheduler 的工作。不要因此产生误解。下文中我们主要就 gpu-manager 这个项目进行分析。

启动流程分析

gpu-manager 本身主要作为 kubernetes 的 device plugin 来实现的,定义了两种设备: vcuda-corevcuda-memory,我们的应用通过 pod 的资源字段进行申请,然后 kube-scheduler 会根据节点上的资源状态进行调度。因此,你最好还需要了解 kubernetes 的 device plugin 的开发知识。关于 device plugin 的开发,可以看之前的一篇文章:Kubernetes开发知识–device-plugin的实现

启动参数

分析一个项目从启动参数开始,可以帮助我们快速了解:

  • driver: 这个是 GPU 的驱动,当前的默认值是 nvidia,很显然该项目可以扩展支持其他类型的 GPU。
  • extra-config: 额外的配置,这个参数暂时看不出来有什么特别
  • volume-config: 这里的 volume 指的是一些动态链接库和可执行文件的位置。也就是 gpu-manager 需要拦截调用的一些库
  • docker-endpoint: 用来挂载到容器中和 docker 做通信的,默认位置是 unix:////var/run/docker.sock
  • query-port: 统计信息服务的查询接口
  • query-port: 统计信息服务的监听地址
  • kubeconfig: 用来授权的配置文件
  • standalone: 暂时还不清楚的参数
  • sample-period: gpu-manager 会查询 gpu 设备的使用情况,这个参数用来设定采样周期
  • node-labels: 给节点自动打标签
  • hostname-override: gpu-manager 在运行时,只关注自己节点上的 pod,这主要是通过 hostname 来辨认的
  • virtual-manager-path: gpu-manager 会为所有需要虚拟 gpu 资源的 pod 创建唯一的文件夹,文件夹的路径就在这个地址下。
  • device-plugin-path: kubernetes 默认的 device plugin 的目录地址
  • checkpoint-path: gpu-manager 会产生 checkpoint 来当缓存用
  • share-mode: gpu-manager 最大的特点就是将一个物理 gpu 分成多个虚拟 gpu,也就是共享模式
  • allocation-check-period: 检查分配了虚拟 gpu 资源的 pod 的状态,及时回收资源
  • incluster-mode: 是否在集群内运行

服务启动

gpu-manager 推荐的部署方案是通过 kubernetes 的 daemonset,然后配置 node selector 调度到指定的节点上。然后 gpu-manager 就开始在指定节点上启动了。

srv := server.NewManager(cfg)
go srv.Run()

这里,我们需要看一下这个 srv 的具体实现,首先是它的结构体:

type managerImpl struct {
    config *config.Config

    allocator      allocFactory.GPUTopoService     // gpu 容器调度分配
    displayer      *display.Display                // gpu 使用情况可视化服务
    virtualManager *vitrual_manager.VirtualManager // 负责管理 vgpu

    bundleServer map[string]ResourceServer
    srv          *grpc.Server
}

config 包含了我们上面的所有参数,就不进去细看了。

allocator 负责在容器调度到节点上后,为其分配具体的设备资源。allocator 实现了探测节点上的 gpu 拓扑架构,然后以最佳性能,最少碎片为目的使用最优的方案进行资源分配。

displayer 是将 gpu 的使用情况输出,方便我们查看。

virtualManager 负责 vgpu 分配后的管理工作。

bundleServer 包含 vcore,vmemory,我们上面提到这两种资源以 device plugin 的方式进行注册,因此他们需要启动 grpc server。

srv: 将 gpu display server 注册到这个 grpc server 中。

接下来,我们就可以分析 srv.Run() 方法具体执行了哪些内容。为了先对整个流程有个大概的印象,我将内容整理成以下条目:

  • 启动 volumeManager,将节点上和 nvidia gpu (包括cuda) 的所有可执行文件和库移动到 /etc/gpu-manager/vdriver 中。并且将关键的库替换成 vcuda-control,实现 cuda 调用的拦截。
  • watchdog 创建 pod 缓存并监控 pod,之后所有关于 pod 的操作都来源于这里。
  • watchdog 给节点打上标签
  • 启动 virtualManager
  • gpu 拓扑结构感知。
  • 初始化资源分配器
  • 设置 vcuda, vmemory, display 的 grpc 服务
  • 启动 metrics 的 http 服务,主要是提供给 prometheus
  • 启动 vcuda,vmemory 的 grpc 服务
  • 启动 display 的 grpc 服务

接下来,我们具体来分析每一步是如何做的。当然,这里只会挑一些重点的部分。

volumeManager 的启动

func (vm *VolumeManager) Run() (err error) {
    // ldcache 是动态链接库的缓存信息
    cache, err := ldcache.Open()
    defer func() {
        if e := cache.Close(); err == nil {
            err = e
        }
    }()
    vols := make(VolumeMap)
    for _, cfg := range vm.Config {
        vol := &Volume{
            Path: path.Join(cfg.BasePath, cfg.Name),
        }

        if cfg.Name == "nvidia" {
            // nvidia 库的位置
            types.DriverLibraryPath = filepath.Join(cfg.BasePath, cfg.Name)
        } else {
            // origin 库的位置
            types.DriverOriginLibraryPath = filepath.Join(cfg.BasePath, cfg.Name)
        }

        for t, c := range cfg.Components {
            switch t {
            case "binaries":
                // 调用 which 来查找可执行文件的位置
                bins, err := which(c...)
                // 将实际位置存起来
                vol.dirs = append(vol.dirs, volumeDir{binDir, bins})
            case "libraries":
                // 是库的话,就从 ldcache 里面去找
                libs32, libs64 := cache.Lookup(c...)
                // 将 library 位置存起来
                vol.dirs = append(vol.dirs, volumeDir{lib32Dir, libs32}, volumeDir{lib64Dir, libs64})
            }
            vols[cfg.Name] = vol
        }
    }
    // 找到了需要的库位置之后,做 mirror 处理
    if err := vm.mirror(vols); err != nil {
        return err
    }
    return nil
}

这段代码的前半部分都是在查找指定的动态链接库和可执行文件,这些文件是在 volume.conf 这个配置文件中指定的,通过参数传进来。查找动态链接库时,使用的是 ldcache,查找可执行文件时,使用了系统的 which 指令。找到之后会将其所在位置记录下来。接着就是对找到的库做 mirror 处理。

func (vm *VolumeManager) mirror(vols VolumeMap) error {
    // nvidia 和 origin
    for driver, vol := range vols {
        if exist, _ := vol.exist(); !exist {
            // 这里的path是/etc/gpu-manager/vdriver下面
            if err := os.MkdirAll(vol.Path, 0755); err != nil {
                return err
            }
        }
        for _, d := range vol.dirs {
            vpath := path.Join(vol.Path, d.name)
            // 创建 bin lib lib64
            if err := os.MkdirAll(vpath, 0755); err != nil {
                return err
            }

            // For each file matching the volume components (blacklist excluded), create a hardlink/copy
            // of it inside the volume directory. We also need to create soname symlinks similar to what
            // ldconfig does since our volume will only show up at runtime.
            for _, f := range d.files {
                glog.V(2).Infof("Mirror %s to %s", f, vpath)
                if err := vm.mirrorFiles(driver, vpath, f); err != nil {
                    return err
                }

                if strings.HasPrefix(path.Base(f), "libcuda.so") {
                    driverStr := strings.SplitN(strings.TrimPrefix(path.Base(f), "libcuda.so."), ".", 2)
                    types.DriverVersionMajor, _ = strconv.Atoi(driverStr[0]) // 驱动版本号
                    types.DriverVersionMinor, _ = strconv.Atoi(driverStr[1])
                    glog.V(2).Infof("Driver version: %d.%d", types.DriverVersionMajor, types.DriverVersionMinor)
                }

                if strings.HasPrefix(path.Base(f), "libcuda-control.so") {
                    vm.cudaControlFile = f
                }
            }
        }
    }

    vCudaFileFn := func(soFile string) error {
        if err := os.Remove(soFile); err != nil {
            if !os.IsNotExist(err) {
                return err
            }
        }
        if err := clone(vm.cudaControlFile, soFile); err != nil {
            return err
        }

        glog.V(2).Infof("Vcuda %s to %s", vm.cudaControlFile, soFile)

        l := strings.TrimRight(soFile, ".0123456789")
        if err := os.Remove(l); err != nil {
            if !os.IsNotExist(err) {
                return err
            }
        }
        if err := clone(vm.cudaControlFile, l); err != nil {
            return err
        }
        glog.V(2).Infof("Vcuda %s to %s", vm.cudaControlFile, l)
        return nil
    }

    if vm.share && len(vm.cudaControlFile) > 0 {
        if len(vm.cudaSoname) > 0 {
            for _, f := range vm.cudaSoname {
                if err := vCudaFileFn(f); err != nil {
                    return err
                }
            }
        }

        if len(vm.mlSoName) > 0 {
            for _, f := range vm.mlSoName {
                if err := vCudaFileFn(f); err != nil {
                    return err
                }
            }
        }
    }

    return nil
}

这段代码先会对所有上面查找到的库或可执行文件调用 mirrorFiles,但是记录下来了 libcuda.so 的版本号和 libcuda-control.so 的位置。注意,这个 libcuda-control 就是 vcuda-control 项目生成的用来拦截 cuda 调用的库。

然后将 cudaControlFile clone到所有 cudaSonamemlSoName 中库的位置。这个 clone 方法会先尝试硬链接过去,如果失败就直接复制过去。这里的 cudaControlFile 就是我们上面所说的 libcuda-control.so 啦。cudaSonamemlSoName 包含了所有需要被拦截调用的库。这样子就实现了拦截所有的 cuda 调用。下面我们在看一下 mirrorFiles 这个方法就可以了。

// driver 是配置文件中的 "nvidia" 或 "origin"
// vpath 是要 mirror 到的位置,在 /etc/gpu-manager/vdriver 下面
func (vm *VolumeManager) mirrorFiles(driver, vpath string, file string) error {
    // In computing, the Executable and Linkable Format (ELF, formerly named Extensible Linking Format), is a common standard file format for executable files, object code, shared libraries, and core dumps
    obj, err := elf.Open(file)
    defer obj.Close()

    // 黑名单机制,具体用处还不清楚,跟 nvidia 的驱动相关
    ok, err := blacklisted(file, obj)
    if ok {
        return nil
    }
    l := path.Join(vpath, path.Base(file))
    // 不管有没有,先尝试把 gpu-manager 里面的移除
    if err := removeFile(l); err != nil {
        return err
    }
    // clone 优先硬连接,其次是复制文件到指定位置
    if err := clone(file, l); err != nil {
        return err
    }
    // 从 elf 中获取当前库的 soname
    soname, err := obj.DynString(elf.DT_SONAME)
    if len(soname) > 0 {
        // 将获取到 soname 组成路径
        l = path.Join(vpath, soname[0])
        // 如果文件和它的soname不一致(是否可以认为这个文件是软链接过去的)
        if err := linkIfNotSameName(path.Base(file), l); err != nil && !os.IsExist(err) {
            return err
        }

        // XXX Many applications (wrongly) assume that libcuda.so exists (e.g. with dlopen)
        // Hardcode the libcuda symlink for the time being.
        if strings.Contains(driver, "nvidia") {
            // 这里为什么要移除 libcuda.so 和 libnvidia-ml.so 的软链接
            // 因为gpu调用会涉及到这两个库,这两个库会软链接到真实的库上。移除后替换成拦截的库
            // Remove libcuda symbol link
            if vm.share && driver == "nvidia" && strings.HasPrefix(soname[0], "libcuda.so") {
                os.Remove(l)
                vm.cudaSoname[l] = l
            }

            // Remove libnvidia-ml symbol link
            if vm.share && driver == "nvidia" && strings.HasPrefix(soname[0], "libnvidia-ml.so") {
                os.Remove(l)
                vm.mlSoName[l] = l
            }

            // XXX GLVND requires this symlink for indirect GLX support
            // It won't be needed once we have an indirect GLX vendor neutral library.
            if strings.HasPrefix(soname[0], "libGLX_nvidia") {
                l = strings.Replace(l, "GLX_nvidia", "GLX_indirect", 1)
                if err := linkIfNotSameName(path.Base(file), l); err != nil && !os.IsExist(err) {
                    return err
                }
            }
        }
    }

    return nil
}

这段代码中,先使用 blacklisted 排除一些不需要处理的库,然后尝试将库或可执行文件 clone 到我们的 /etc/gpu-manager/vdriver 下面。/etc/gpu-manager/vdriver 下面有两个文件夹,一个是 nvidia,保存了已经被我们拦截的库,一个是 origin,这里面是原始的未处理的库。同时,还将 libcuda.so 和 libnvidia-ml.so 移除了,这样就调用不到真实的库了,转而在之后用我们拦截的库来替换这几个文件。

至此,volumeManager 分析结束。

gpu 拓扑结构感知

关于 gpu 拓扑结构这一块,主要是为了在之后做资源分配时选择最优方案用的。腾讯也有分享过这一块的资料(腾讯基于 Kubernetes 的企业级容器云实践):

gpu 拓扑结构

这里不影响我们理解整个工作机制,所以先不分析。

初始化资源分配器

// 分配器,根据driver调用相应的分配器
initAllocator := allocFactory.NewFuncForName(m.config.Driver)
if initAllocator == nil {
    return fmt.Errorf("can not find allocator for %s", m.config.Driver)
}

m.allocator = initAllocator(m.config, tree, client)

这里的 initAllocator 对应的方法是:

//NewNvidiaTopoAllocator returns a new NvidiaTopoAllocator
func NewNvidiaTopoAllocator(config *config.Config, tree device.GPUTree, k8sClient kubernetes.Interface) allocator.GPUTopoService {
    runtimeRequestTimeout := metav1.Duration{Duration: 2 * time.Minute}
    imagePullProgressDeadline := metav1.Duration{Duration: 1 * time.Minute}
    dockerClientConfig := &dockershim.ClientConfig{
        DockerEndpoint:            config.DockerEndpoint,
        RuntimeRequestTimeout:     runtimeRequestTimeout.Duration,
        ImagePullProgressDeadline: imagePullProgressDeadline.Duration,
    }

    _tree, _ := tree.(*nvtree.NvidiaTree)
    cm, err := checkpoint.NewManager(config.CheckpointPath, checkpointFileName)
    if err != nil {
        glog.Fatalf("Failed to create checkpoint manager due to %s", err.Error())
    }
    alloc := &NvidiaTopoAllocator{
        tree:              _tree,
        config:            config,
        evaluators:        make(map[string]Evaluator),
        dockerClient:      dockershim.NewDockerClientFromConfig(dockerClientConfig),
        allocatedPod:      cache.NewAllocateCache(),
        k8sClient:         k8sClient,
        queue:             workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
        stopChan:          make(chan struct{}),
        checkpointManager: cm,
    }

    // Load kernel module if it's not loaded
    alloc.loadModule()

    // Initialize evaluator
    alloc.initEvaluator(_tree)

    // Read extra config if it's given
    alloc.loadExtraConfig(config.ExtraConfigPath)

    // Process allocation results in another goroutine
    go wait.Until(alloc.runProcessResult, time.Second, alloc.stopChan)

    // Recover
    alloc.recoverInUsed()

    // Check allocation in another goroutine periodically
    go alloc.checkAllocationPeriodically(alloc.stopChan)

    return alloc
}

allocator 调用 loadModule() 来启用 nvidia 的内核模块。

调用 initEvaluator(_tree) 来初始化评估器,这里的 _tree 就是感知到的 gpu 拓扑结构。

调用 loadExtraConfig(config.ExtraConfigPath) 来加载启动时传入的额外参数配置文件。

go wait.Until(alloc.runProcessResult, time.Second, alloc.stopChan) 创建了新的协程来处理分配结果。

recoverInUsed() 是恢复 gpu 分配结果。比如在 gpu-manager 重启之后,之前的 gpu 分配结果都丢失了,但是节点上还有大量的容器正在占用 gpu,这个方法会通过查找节点上存活的容器,通过 docker endpoint, 调用 InspectContainer 获取容器中占用的 device id,然后标记该设备和容器之间的占用关系。

go alloc.checkAllocationPeriodically(alloc.stopChan) 创建新的协程来周期性的检查资源分配情况。如果是 Failed 和 Pending 状态的容器,就根据错误信息检查是否应该删除它们,然后如果这些 pod 的控制器是 deployment 类似的,就尝试删除它们,这样控制器会重新创建这些 pod 进行调度,让这些 pod 恢复到正常运行状态。

启动各种服务

vcuda,vmemory 的 grpc 服务是 device plugin 的机制。metrics service 是提供给 prometheus 调用的,以监控该节点的相关信息。display 服务会打印 gpu 拓扑结构的相关信息。

Device plugin 的注册

Device plugin

这张图是 device plugin 注册的时序图。gpu-manager 的注册方法是:

func (m *managerImpl) RegisterToKubelet() error {
    socketFile := filepath.Join(m.config.DevicePluginPath, types.KubeletSocket)
    dialOptions := []grpc.DialOption{grpc.WithInsecure(), grpc.WithDialer(utils.UnixDial), grpc.WithBlock(), grpc.WithTimeout(time.Second * 5)}

    conn, err := grpc.Dial(socketFile, dialOptions...)
    if err != nil {
        return err
    }
    defer conn.Close()

    client := pluginapi.NewRegistrationClient(conn)

    for _, srv := range m.bundleServer {
        req := &pluginapi.RegisterRequest{
            Version:      pluginapi.Version,
            Endpoint:     path.Base(srv.SocketName()),
            ResourceName: srv.ResourceName(),
            Options:      &pluginapi.DevicePluginOptions{PreStartRequired: true},
        }

        glog.V(2).Infof("Register to kubelet with endpoint %s", req.Endpoint)
        _, err = client.Register(context.Background(), req)
        if err != nil {
            return err
        }
    }

    return nil
}

这里分别注册了 vcuda 和 vmemory。vcuda 和 vmemory 的 Allocate 方法都指向了同一个方法,写在了 service/allocator/nvidia/allocator.go 中。

至此,gpu-manager 的启动流程结束。接下来的 gpu-manager 的职责就是等待 kubelet 通过 grpc 的调用,在容器调度到节点的时候进行资源设备的分配,必要目录的挂载等工作了。具体的可以见下一篇文章

最后,提供一个简单的脑图帮助理解:

gpu-manager-arch

Kubernetes开发知识–device-plugin的实现

什么是 device plugin

Kubernetes 作为一个自动化容器编排系统,在调度 pod 的时候会根据容器需要的资源进行节点的选择,节点的选择会分为预选和优选阶段。预选阶段会根据所有节点上剩余的资源量与 pod 需要的资源量进行对比,选出能够满足需求的节点。通常情况下,这里的资源都会包括 CPU 和 Memory,就像下面这样:

resources:
  requests:
    memory: "64Mi"
    cpu: "250m"
  limits:
    memory: "128Mi"
    cpu: "500m"

但是现在我们有了新的需求,我们有一个 tensorflow 的模型需要借助 tensorflow serving 来部署,同时我们希望采用 GPU 部署的方案以加快模型的在线推算速度。这样我们就需要一个 GPU 的资源并借助 Kubernetes 的调度器将容器调度到有空余 GPU 资源的方案。

在 1.11 版本之前的 Kubernetes 中,提供了 alpha.kubernetes.io/nvidia-gpu 的资源名称来帮助我们根据 GPU 资源调度。但是这也带来了一些问题:

  • Kubernetes 需要维护 NVIDIA GPU 相关的代码,增加了维护成本。
  • NVIDIA GPU 方面的专家不一定熟悉 Kubernetes,这不符合让最擅长的人做最擅长的事的原则。
  • 除了 NVIDIA GPU,还会有其他的计算资源需要支持。

因此,Kubernetes 在 1.8 版本引入了 device plugin 机制,将第三方的计算资源通过插件的方式引入 Kubernetes,并且由第三方厂商自行维护。Kubernetes 社区的活瞬间就轻松了,第三方厂商也开心了。

通过以上的说明,可以总结出 device plugin 主要用来解耦第三方计算资源和 kubernetes 系统,将第三方的计算资源通过插件的方式引入 Kubernetes。当然 cpu 和 memory 除外,毕竟谁还能少了 CPU 和 Memory 呢。

device plugin 能做什么

目前一些常用的 device plugin 有:

我了解的还有腾讯的 Gaia Scheduler,通过 device plugin 实现的 GPU 虚拟化方案。如果 NVIDIA 的 GPU 方案还不够适合你,可以看看腾讯的这个方案:tkestack/gpu-manager

我觉得腾讯的 GPU 虚拟化方案是最能说明 device plugin 使用场景的例子,通过定义 tencent.com/vcuda-coretencent.com/vcuda-memory 这两个计算资源,来将一个物理 GPU 划分成多个虚拟 GPU 进行调度,这样可以实现一个 GPU 上部署多个 tensorflow serving。你不需要购买特殊的硬件或者修改任何 Kubernetes 的代码,就有了 GPU 虚拟化的能力。

下面我们开个脑洞,现在我们有了一种叫做 cola 的计算资源,可以提高程序的 IO 能力。但是 cola 的资源有限,只分配给特定的容器使用。这时候我们就可以通过实现自己的 device plugin 来满足这个需求。我们的计算资源名就叫做: myway5.com/cola,在下面一小节做具体的实现。

device plugin 的实现方案

device plugin 的工作原理其实不复杂。主要有以下步骤:

  • 首先 device plugin 可以通过手动或 daemonset 部署到需要的节点上。
  • 为了让 Kubernetes 发现 device plugin,需要向 kubelet 的 unix socket。 进行注册,注册的信息包括 device plugin 的 unix socket,API Version,ResourceName。
  • kubelet 通过 grpc 向 device plugin 调用 ListAndWatch, 获取当前节点上的资源。
  • kubelet 向 api server 更新节点状态来通知资源变更。
  • 用户创建 pod,请求资源并调度到节点上后,kubelet 调用 device plugin 的 Allocate 进行资源分配。

时序图如下:

device plugins

在 device plugin 的实现中,最关键的两个要实现的方法是 ListAndWatchAllocate。除此之外,还要注意监控 kubelet 的重启,一般是使用 fsnotify 类似的库监控 kubelet.sock 的重新创建事件。如果重新创建了,则认为 kubelet 是重启了,我们需要重新向 kubelet 注册 device plugin。

ListAndWatch

我们上面定义的 myway5.com/cola 资源用 /etc/colas 下的文件代表。每一个文件代表一个可用的资源。因此实现 ListAndWatch 就是查找该文件夹下的文件,然后添加到设备列表发送给 kubelet,之后调用 fsnotify 去监控文件的 CREATEREMOVE 事件。每次设备列表发生变更都重新向 kubelet 发送更新过的设备列表。

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
func (s *ColaServer) ListAndWatch(e *pluginapi.Empty, srv pluginapi.DevicePlugin_ListAndWatchServer) error {
    log.Infoln("ListAndWatch called")
    devs := make([]*pluginapi.Device, len(s.devices))

    i := 0
    for _, dev := range s.devices {
        devs[i] = dev
        i++
    }

    err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
    if err != nil {
        log.Errorf("ListAndWatch send device error: %v", err)
        return err
    }

    // 更新 device list
    for {
        log.Infoln("waiting for device change")
        select {
        case <-s.notify:
            log.Infoln("开始更新device list, 设备数:", len(s.devices))
            devs := make([]*pluginapi.Device, len(s.devices))

            i := 0
            for _, dev := range s.devices {
                devs[i] = dev
                i++
            }

            srv.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
        }
    }
}

Allocate

在用户创建的 Pod 请求资源时,Kubernetes 的调度器会进行调度,并通过 kubelet 向 device plugin 发出 Allocate 调用,这一步的调用主要是为了让 device plugin 为容器调度资源。 在调度成功后向 kubelet 返回调度结果即可。

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
func (s *ColaServer) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    log.Infoln("Allocate called")
    resps := &pluginapi.AllocateResponse{}
    for _, req := range reqs.ContainerRequests {
        log.Infof("received request: %v", strings.Join(req.DevicesIDs, ","))
        resp := pluginapi.ContainerAllocateResponse{
            Envs: map[string]string{
                "COLA_DEVICES": strings.Join(req.DevicesIDs, ","),
            },
        }
        resps.ContainerResponses = append(resps.ContainerResponses, &resp)
    }
    return resps, nil
}

部署

device plugin 可以手动部署到机器上,也可以通过 Daemonset 进行部署。这里当然是 Daemonset 进行部署了。部署的时候有几个注意事项:

  • 需要挂载 hostPath,其中 /var/lib/kubelet/device-plugins 是必须的。这个文件夹下有 kubelet.sock,以及我们也需要将 device plugin 的 unix socket 文件存在这里。使得 kubelet 可以和我们的应用通信。
  • 为 device plugin 的 Pod 设置调度优先级别,通常设置成 priorityClassName: "system-node-critical"。这样可以保证不会因为节点利用率过高被逐出。
  • 如果资源设备不是每台机器都有,建议使用 nodeSelector 将 device plugin 调度到指定的机器上。

device plugin 的开发源代码可以参考上面的 cola 例子:cola device plugin

部署结束之后,可以查看一下节点的资源情况:

$ kubectl describe nodes test
Capacity:
 cpu:                2
 ephemeral-storage:  17784752Ki
 hugepages-2Mi:      0
 memory:             1986740Ki
 myway5.com/cola:    2
 pods:               110
Allocatable:
 cpu:                2
 ephemeral-storage:  17784752Ki
 hugepages-2Mi:      0
 memory:             1986740Ki
 myway5.com/cola:    2
 pods:               110

创建一个 pod,请求 myway5.com/cola 资源:

$ kubectl apply -f e2e/pod-with-cola.yaml

然后查看一下 cola pod 的日志来了解设备发现和调度情况:

$ kubectl -n kube-system logs cola-thtm9 
time="2020-03-24T08:16:53Z" level=info msg="cola device plugin starting"
time="2020-03-24T08:16:53Z" level=info msg="find device 'cocacola'"
time="2020-03-24T08:16:53Z" level=info msg="find device 'peisicola'"
time="2020-03-24T08:16:53Z" level=info msg="watching devices"
time="2020-03-24T08:16:53Z" level=info msg="start GPPC server for 'myway5.com/cola'"
time="2020-03-24T08:16:53Z" level=info msg="Register to kubelet with endpoint cola.sock"
time="2020-03-24T08:16:53Z" level=info msg="register to kubelet successfully"
time="2020-03-24T08:16:53Z" level=info msg="ListAndWatch called"
time="2020-03-24T08:16:53Z" level=info msg="waiting for device change"
time="2020-03-24T08:17:10Z" level=info msg="Allocate called"

Kubernetes 开发知识–Kubernetes 准入控制与 admission webhook 的使用

一、什么是准入控制

我们都知道 Kubernetes 的最核心组件就是它的 API Server,所有资源的创建、更新和删除都是通过 API Server 进行的。我们可以通过 HTTP 请求或者 kubectl 这样的客户端来和 APIServer 通信,在我们操作对象的请求到达 API Server 之前,会先到达准入控制器这里。准入控制器可以执行“验证”和“变更”操作。因此我们可以认为有两种准入控制器:变更(mutating)准入控制器验证(validating)准入控制器

准入控制过程也同样分为两个阶段。第一阶段,运行变更准入控制器,对对象进行修改操作;第二阶段,运行验证准入控制器。如果任何一个阶段的任何控制器拒绝了该请求,则整个请求将立即被拒绝,并向终端用户返回一个错误。

我们可以通过下面的图来直观的感受一下:

Kubernetes 开发知识--Kubernetes 准入控制与 admission webhook 的使用

二、准入控制能做什么

准入控制存在的目的就是提高 Kubernetes 架构的灵活性。因此 Kubernetes 提供了一些准入控制器,并且允许你打开或关闭一部分,同时你也可以自定义准入控制器,来完成你自己的一些特殊需求。可以自定义的准入控制器也分为两种:一个叫 MutatingAdmissionWebhook,一个叫 ValidatingAdmissionWebhook,其实也是我们上面说的变更准入控制器验证准入控制器

Kubernetes 提供了一些常用的准入控制器,下面举几个例子:

  • AlwaysPullImages: 该准入控制器会修改每一个新创建的 Pod 的镜像拉取策略为 Always。这样在多租户集群里,用户就能保证自己的私有镜像只会在有凭证的情况下使用,而不会出现因为机器上缓存了镜像而被其他用户直接使用的情况。

  • NamespaceLifecycle:该准入控制器禁止在一个正在被终止的 Namespace 中创建新对象,并且确保使用不存在的 Namespace 的请求被拒绝。该准入控制器还会禁止删除三个系统保留的命名空间,即 defaultkube-systemkube-public

除了 Kubernetes 提供的这些准入控制器,我们可以编写 MutatingAdmissionWebhookValidatingAdmissionWebhook。比如我之前有写一个叫做 [lazykube](https://github.com/joyme123/lazykube)MutatingAdmissionWebhook,它的作用是对每一个创建的 Pod 的镜像源都进行校验,如果是 docker hub, gcr.io, quay.io 等地址的镜像,就修改成国内的代理源,这样就可以自动解决镜像需要翻墙下载的问题了。

三、如何使用 admission webhook 编写一个准入控制器

使用 admission webhook 编写一个准入控制器其实很简单,它就和你日常写 web server 一样:

// Start 启动服务
func (whsrv *WebhookServer) Start() error {
    mux := http.NewServeMux()
    mux.HandleFunc("/mutate", whsrv.serve)
    whsrv.server.Handler = mux

    if err := whsrv.server.ListenAndServeTLS("", ""); err != nil {
        return fmt.Errorf("Failed to listen and serve webhook server: %v", err)
    }

    return nil
}

在收到一个 AdmissionReview 请求的时候,将其中的 AdmissionRequest 对象携带的资源对象取出来进行校验或变更:

var body []byte
if r.Body != nil {
    if data, err := ioutil.ReadAll(r.Body); err == nil {
        body = data
    }
}

if len(body) == 0 {
    log.Error("empty body")
    http.Error(w, "empty body", http.StatusBadRequest)
    return
}

// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
    log.Errorf("Content-Type=%s, expect application/json", contentType)
    http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType)
    return
}

var admissionResponse *v1beta1.AdmissionResponse
ar := v1beta1.AdmissionReview{}
if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
    log.Errorf("Can't decode body: %v", err)
    admissionResponse = &v1beta1.AdmissionResponse{
        Result: &metav1.Status{
            Message: err.Error(),
        },
    }
} else {
    admissionResponse = whsrv.mutate(&ar)
}

然后生成 AdmissionResponse, 通过 AdmissionReview 进行响应即可:

admissionReview := v1beta1.AdmissionReview{}
if admissionResponse != nil {
    admissionReview.Response = admissionResponse
    if ar.Request != nil {
        admissionReview.Response.UID = ar.Request.UID
    }
}

resp, err := json.Marshal(admissionReview)
if err != nil {
    log.Errorf("Can't encode response: %v", err)
    http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
}
log.Infoln("Ready to write response ...")
if _, err := w.Write(resp); err != nil {
    log.Errorf("Can't write response: %v", err)
    http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}

完整的代码可以参考上面的 lazykube 项目。

这样我们写好之后就可以开始部署了。我们需要告诉 Kubernetes 我们的 admission webhook 的地址,以及我们要操作的资源对象,因此我们创建一个 MutatingWebhookConfiguration 或者 ValidatingWebhookConfiguration 对象,比如 lazykube 就是对创建 Pod 进行变更操作:

apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
  name: lazykube-webhook-cfg
  namespace: kube-system
  labels:
    app: lazykube
webhooks:
  - name: lazykube.myway5.com
    clientConfig:
      service:
        name: lazykube-webhook-svc
        namespace: kube-system
        path: "/mutate"
      caBundle: ××××××××
    rules:
      - operations: [ "CREATE" ]
        apiGroups: [""]
        apiVersions: ["v1"]
        resources: ["pods"]

然后为了让 Kubernetes 通过 HTTPS 进行通信,并信任我们的证书,我们需要让 Kubernetes 集群为我们的签发证书。具体的方法可以参考这里:kubernetes 集群中的证书签发。最后使用 Deployment 部署我们的服务即可。

kubernetes 集群中的证书签发

场景

在我们使用 kubernetes 的 admission webhook 机制实现一些集群资源认证、修改的方案时,会涉及到集群内部的 https 通信。这就涉及到我们的服务需要配置证书,并且要让 kubernetes 的组件信任该证书。我们都知道 kubernetes 集群中所有的证书都是由一个自定义的 CA 签发的,并且 kubernetes 集群都信任该 CA,因此基于该原理,使用 kubernetes 提供的 CertificateSigningRequest 来为我们的证书签名即可。

具体流程

kubernetes 官方的文档上提供了比较详细的说明,文档地址在这里:管理集群中的 TLS 认证

大致流程如下:

  • 使用 cfssl 为我们的 service 地址创建证书
  • 使用 CertificateSigningRequest 请求 kubernetes 的 CA 来为该证书签名
  • 管理员通过 kubectl certificate approve 来批准请求
  • 通过 kubectl get csr 获取签名后的证书,并使用到我们的服务中。

当然这个过程还不够自动化,我们可以使用一些很好的脚本来帮助我们完成这个工作:

这个脚本使用 openssl 来创建公私钥,创建 CertificateSigningRequest 请求对公钥签名,然后自动批准并将私钥和签名后的公钥存到 secret 中。使用方式如下:

./webhook-create-signed-cert.sh \
    --service lazykube-webhook-svc \
    --secret lazykube-webhook-certs \
    --namespace kube-system

这个脚本其实跟证书签发没有太大关系,但是如果你使用 mutatingwebhook 的话正好可以使用它,同理 validatingwebhook 也是如此

cat mutatingwebhook.yaml | \
    ./webhook-patch-ca-bundle.sh > \
    mutatingwebhook-ca-bundle.yaml

一些问题

我在使用上述脚本的时候,发现使用 rancher 创建的 kubernetes 集群有问题。这是因为 rancher 创建的 kubernetes 集群没有默认开启 kubernetes controller manager 的签名选项。具体的选项可以参考这里:给集群管理员的一个建议

修改的方案就是打开该选项,rancher 中可以在界面上编辑集群的 yaml 文件,加上以下参数:

services:
  kube-controller: 
    extra_args: 
      cluster-signing-cert-file: "/etc/kubernetes/ssl/kube-ca.pem"
      cluster-signing-key-file: "/etc/kubernetes/ssl/kube-ca-key.pem"

kubernetes存储–FlexVolume

简介

kubernetes 使用 volume 来满足它的存储需求,它支持很多的存储系统,比如 nfs、 glusterfs、cephfs等等,但是这些存储的实现方式有一个问题,就是它们的实现代码都必须合并到 Kubernetes 的代码中(称为 in-tree),这为 kubernetes 社区带来了维护上的成本。因此,kubernetes 提出了两种 out-of-tree 的方案: FlexVolume 和 csi。通过这两种方案实现的存储功能不必合并到 kubernetes 的代码仓库,由存储系统的供应商单独维护。

FlexVolume 是这篇文章主要关注的点,FlexVolume 自 1.2 版本开始就支持了。它使用基于 exec 的模型来与驱动程序对接。用户必须在每个节点(有些情况下包括主节点)上的预定义卷插件路径中安装 FlexVolume 驱动程序的可执行文件。当需要挂载 volume 的时候,由 kubelet 执行挂载命令来挂载即可。

基于 nfs 实现 FlexVolume

在探究 FlexVolume 的实现原理之前,我们可以先看一下官方提供的基于 nfs 的例子

注: 我这里是用 minikube 启动的本地 kubernetes 集群。

为了部署基于 nfs 实现的 FlexVolume,我们首先将目录下的 nfs 复制到 deploy 文件夹下

$ cp nfs deploy

然后将 deploy/deploy.sh 中的 dummy 修改成 nfs,表示我们使用的插件脚本是 nfs 这个可执行文件。

接着在 deploy 文件夹下构建 docker 镜像,这里要修改 Dockerfile,将 nfs COPY 到镜像中。然后执行下面的命令(镜像标签需要修改成你自己的):

$ docker build -t joyme/nfs-flexvolume:1.0 .
$ docker push joyme/nfs-flexvolume:1.0

镜像构建并推送完成之后,我们就开始部署了。因为 FlexVolume 要求将驱动文件放在指定的目录下,最粗暴的方式就是手动将文件 scp 到集群的每个节点上。这里为了方便,我们还可以使用 kubernetes 的 Daemenset,然后使用 hostPath 将文件放到主机之上。我们修改 deploy 文件夹下的 ds.yaml 这个部署文件。将我们刚刚推送的镜像填进去。然后执行以下命令进行部署。

$ kubectl apply -f ds.yaml

这里有个地方要注意, 默认的插件安装地址是 /usr/libexec/kubernetes/kubelet-plugins/volume/exec/, 但是 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 都可以修改这个值,如果你启动这些组件是指定了这些参数,那就需要修改 ds.yaml 中的路径。

在集群中部署完成之后,我们可以到某个节点上检查一下/usr/libexec/kubernetes/kubelet-plugins/volume/exec/是否存在我们刚刚部署的文件。

最后我们创建一个 nginx,挂载一个 FlexVolume。在创建之前,我们需要先启动一个 nfs server,这里为了方便,可以使用容器启动一个。

$ docker run -d --privileged --restart=always \
-v /tmp:/dws_nas_scratch \
-e NFS_EXPORT_DIR_1=/dws_nas_scratch \
-e NFS_EXPORT_DOMAIN_1=\* \
-e NFS_EXPORT_OPTIONS_1=ro,insecure,no_subtree_check,no_root_squash,fsid=1 \
-p 111:111 -p 111:111/udp \
-p 2049:2049 -p 2049:2049/udp \
-p 32765:32765 -p 32765:32765/udp \
-p 32766:32766 -p 32766:32766/udp \
-p 32767:32767 -p 32767:32767/udp \
fuzzle/docker-nfs-server:latest

使用官方提供的 nginx-nfs.yaml 文件,然后把其中的 server 地址修改一下,使用以下命令创建:

$ kubectl apply -f nginx-nfs.yaml

注意:如果出现错误,可以检查 node 上是否安装了 jq, nfs-common 等必要的依赖包。

实现原理

在完成上面例子的过程中,关于 FlexVolume 的大多数问题都比较好解答了。我们来看一下 nfs 的实现代码:

usage() {
    err "Invalid usage. Usage: "
    err "\t0 init"
    err "\t0 mount <mount dir> <json params>"
    err "\t0 unmount <mount dir>"
    exit 1
}

err() {
    echo -ne* 1>&2
}

log() {
    echo -ne * >&1
}

ismounted() {
    MOUNT=`findmnt -n{MNTPATH} 2>/dev/null | cut -d' ' -f1`
    if [ "{MOUNT}" == "{MNTPATH}" ]; then
        echo "1"
    else
        echo "0"
    fi
}

domount() {
    MNTPATH=1

    NFS_SERVER=(echo 2 | jq -r '.server')
    SHARE=(echo 2 | jq -r '.share')

    if [(ismounted) -eq 1 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    mkdir -p {MNTPATH} &> /dev/null

    mount -t nfs{NFS_SERVER}:/{SHARE}{MNTPATH} &> /dev/null
    if [ ? -ne 0 ]; then
        err "{ \"status\": \"Failure\", \"message\": \"Failed to mount{NFS_SERVER}:{SHARE} at{MNTPATH}\"}"
        exit 1
    fi
    log '{"status": "Success"}'
    exit 0
}

unmount() {
    MNTPATH=1
    if [(ismounted) -eq 0 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    umount {MNTPATH} &> /dev/null
    if [? -ne 0 ]; then
        err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at {MNTPATH}\"}"
        exit 1
    fi

    log '{"status": "Success"}'
    exit 0
}

op=1

if ! command -v jq >/dev/null 2>&1; then
    err "{ \"status\": \"Failure\", \"message\": \"'jq' binary not found. Please install jq package before using this driver\"}"
    exit 1
fi

if [ "op" = "init" ]; then
    log '{"status": "Success", "capabilities": {"attach": false}}'
    exit 0
fi

if [# -lt 2 ]; then
    usage
fi

shift

case "op" in
    mount)
        domount*
        ;;
    unmount)
        unmount $*
        ;;
    *)
        log '{"status": "Not supported"}'
        exit 0
esac

exit 1

其实就是一段 shell 脚本,支持三个命令: init、mount、unmount。当我们在集群中为某个 pod 挂载 FlexVolume时,该 pod 所在节点的 kubelet 会调用其指定的插件脚本执行 mount 命令,然后挂载给 pod 使用。当然了,FlexVolume 还支持更复杂的插件。这个可以看官方的文档: flexvolume

部署方案

关于如何部署 FlexVolume 的插件,其实在例子中也有提到,这里可以总结一下:

  • 手动部署到每个节点的指定目录下,比如我们刚刚部署的 nfs ,其实际路径是: /usr/libexec/kubernetes/kubelet-plugins/volume/exec/k8s~nfs。其中 /usr/libexec/kubernetes/kubelet-plugins/volume/exec 是默认路径,也可以通过 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 来指定。k8s~nfs 这个路径中,k8s 是供应商, nfs 是驱动名称,在使用的时候可以这样指定: `driver: “k8s/nfs”。

  • 使用 kubernetes 的 deamonset 配合 hostPath 来部署,因为 daemonset 会在每个节点上都启动 pod,然后通过 hostPath 将插件放在指定的位置即可。kubernetes 集群中 master 节点可能被设置成不允许调度。这种情况下 daemonset 默认不调度到 master 节点上,可以使用 tolerations 来解决这个问题. 具体可参考: Scheduler is not scheduling Pod for DaemonSet in Master node

  • 其实除了 kubelet 要调用插件之外,controller-manager 也要调用。比如执行 init, attach, detach, waitforattach, isattached 等命令。

argo的输入输出源代码分析

简介

argo是一个工作流的调度引擎,支持 Steps 和 DAG 这两种工作流。

  • Steps: 是按照步骤,从前往后的工作流调度方案。工作流中的每一步都只依赖上一步的结果
  • DAG: 全称是 directed acyclic graph,译为有向无环图。与 Steps 的区别在于每一步可能依赖之前的多步输出,但是不会循环依赖(也就是无环)

不论是在什么类型的工作流上,argo都抽象出了两种输入输出:

  • parameters: 通常情况下都是字符串,该字符串可以来源于标准输出,也可以来源于文件的内容
  • artifacts: 可以理解成文件

输入输出是连接整个工作流的核心。每一步都可以看作是一次函数调用。那么在argo中,它是如何实现在多步之间输入输出的传输呢?下面会通过源代码进行分析。

在看代码之前,可以看一个 argo 的工作流中的一个pod,为了查看更方便,我删除一些不需要关注的字段:

$ kubectl -n workflow describe pods custom-workflow-111-2fw2f-2639432629

Name:           custom-workflow-111-2fw2f-2639432629
Namespace:      workflow
Labels:         pipeline.starx.com/nodeID=743
                workflows.argoproj.io/completed=true
                workflows.argoproj.io/workflow=custom-workflow-111-2fw2f
Annotations:    cni.projectcalico.org/podIP: 10.42.0.83/32
                workflows.argoproj.io/node-name: custom-workflow-111-2fw2f.yolov3-evaluate-743
                workflows.argoproj.io/outputs:
                  {"result":...
                workflows.argoproj.io/template:
                  {"name":"yolov3-evaluate-743","inputs":{"parameters":[{"name":"userParam","value":"eyJTY29yZVRocmVzaG9sZCI6MC41LCJJb3VfVGhyZXNob2xkIjowLjQ...
Controlled By:  Workflow/custom-workflow-111-2fw2f
Init Containers:
  init:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      init
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/inputs/artifacts from input-artifacts (rw)
      /argo/podmetadata from podmetadata (rw)
      /argo/staging from argo-staging (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
Containers:
  wait:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      wait
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/podmetadata from podmetadata (rw)
      /mainctrfs/argo/staging from argo-staging (rw)
      /mainctrfs/tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /mainctrfs/tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
      /var/run/docker.sock from docker-sock (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
  main:
    Image:         registry.cn-shanghai.aliyuncs.com/xinhuodev/wt:0.4
    Command:
      sh
    Args:
      /argo/staging/script
    Mounts:
      /argo/staging from argo-staging (rw)
      /tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
Volumes:
  podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations
  docker-sock:
    Type:          HostPath (bare host directory volume)
    Path:          /var/run/docker.sock
    HostPathType:  Socket
  input-artifacts:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  argo-staging:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  default-token-lfk5b:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-lfk5b
    Optional:    false

我们需要关注的信息有:

  • Pod 的 Annotations
  • Init Containers 启动的初始化容器
  • Containers 中的 wait 容器和 main 容器
  • Pod 的 Volumes 和每个容器的 Mounts

Init 容器

argo 创建的 Pod 的初始化容器执行了 argoexec init 命令,从名字上可以猜测出,这个容器负责初始化 Pod 中的环境,比如获取来上一步的输入等等,对应的代码是 cmd/argoexec/commands/init.go, 我们的分析也从这里开始。在执行 argo exec init之后,第一个调用的函数应该是loadArtifacts()。这个方法中做了三件事: initExecutor()wfExecutor.StageFiles()wfExecutor.LoadArtifacts()

initExecutor:

initExecutor 的代码如下(删除了不重要的代码):

func initExecutor() *executor.WorkflowExecutor {
    tmpl, err := executor.LoadTemplate(podAnnotationsPath)

    var cre executor.ContainerRuntimeExecutor
    switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
    case common.ContainerRuntimeExecutorK8sAPI:
        cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
    case common.ContainerRuntimeExecutorKubelet:
        cre, err = kubelet.NewKubeletExecutor()
    case common.ContainerRuntimeExecutorPNS:
        cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
    default:
        cre, err = docker.NewDockerExecutor()
    }

    wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
    yamlBytes, _ := json.Marshal(&wfExecutor.Template)
    return &wfExecutor
}

podAnnotationsPath加载模板,这个模板其实就是 Argo 中单步的执行模板,默认情况下它的值是 /argo/podmetadata/annotations,这正好是 init 容器的挂载,而这个挂载对应的卷是:

 podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations

这里的 DownwardAPI 也解释一下,它是一种 volume 的类型,可以将 Pod 和 Container 的字段通过挂载文件的方式提供给容器内的进程方案。那么这里就是将 Pod 的 Annotations 字段通过上面的路径提供给 init 容器,init 容器根据其中的 template 获取该 Pod 的输入输出。

接下来判断根据容器运行时进行判断,这里我们只考虑 docker 作为容器运行时的情况。最后调用NewExecutor实例化了一个 wfExecutor

StageFiles()

源代码如下:

func (we *WorkflowExecutor) StageFiles() error {
    var filePath string
    var body []byte
    switch we.Template.GetType() {
    case wfv1.TemplateTypeScript:
        log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
        filePath = common.ExecutorScriptSourcePath
        body = []byte(we.Template.Script.Source)
    case wfv1.TemplateTypeResource:
        log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
        filePath = common.ExecutorResourceManifestPath
        body = []byte(we.Template.Resource.Manifest)
    default:
        return nil
    }
    err := ioutil.WriteFile(filePath, body, 0644)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return nil
}

职责很简单,根据 template 的类型,写入到不同的文件中,比如 script 就写入到 /argo/staging/script。这就是我们在 main 容器中执行的脚本了。

LoadArtifacts

// LoadArtifacts loads artifacts from location to a container path
func (we *WorkflowExecutor) LoadArtifacts() error {
    for _, art := range we.Template.Inputs.Artifacts {
        artDriver, err := we.InitDriver(art)

        var artPath string
        mnt := common.FindOverlappingVolume(&we.Template, art.Path)
        if mnt == nil {
            artPath = path.Join(common.ExecutorArtifactBaseDir, art.Name)
        } else {
            // If we get here, it means the input artifact path overlaps with an user specified
            // volumeMount in the container. Because we also implement input artifacts as volume
            // mounts, we need to load the artifact into the user specified volume mount,
            // as opposed to the `input-artifacts` volume that is an implementation detail
            // unbeknownst to the user.
            log.Infof("Specified artifact path %s overlaps with volume mount at %s. Extracting to volume mount", art.Path, mnt.MountPath)
            artPath = path.Join(common.ExecutorMainFilesystemDir, art.Path)
        }

        // The artifact is downloaded to a temporary location, after which we determine if
        // the file is a tarball or not. If it is, it is first extracted then renamed to
        // the desired location. If not, it is simply renamed to the location.
        tempArtPath := artPath + ".tmp"
        err = artDriver.Load(&art, tempArtPath)
        if err != nil {
            return err
        }
        if isTarball(tempArtPath) {
            err = untar(tempArtPath, artPath)
            _ = os.Remove(tempArtPath)
        } else {
            err = os.Rename(tempArtPath, artPath)
        }

        if art.Mode != nil {
            err = os.Chmod(artPath, os.FileMode(*art.Mode))
        }
    }
    return nil
}

InitDriver是初始化 Artifacts 的驱动。Argo 支持多种类型的存储系统,在 v2.3.0 这个版本支持: s3, http, git, artifactory, hdfs, raw。

FindOverlappingVolume 是检查 artifacts 的路径和用户挂载的路径是否有重合。如果有,则返回深度最深的路径,如果没有,则返回 nil。如果返回 nil, 则使用 /argo/inputs/artifacts 作为 artifacts 的基础路径。否则使用 /mainctrfs 作为路径。

下面就是下载文件,解压文件并修改权限了。

注意在这里,init、wait和main容器都挂载了input-artifactsargo-staging,并且 init 将输入和script放在了这两个卷中,所以其他几个卷都可以共享这些文件。

wait 容器

wait容器的职责有以下几点:

  • 等待 main 容器结束
  • 杀死 sidecar
  • 保存日志
  • 保存 parameters
  • 保存 artifacts
  • 获取脚本的输出流
  • 将输出放在 Annotations 上

下面我们看这些功能点的实现:

等待 main 容器结束

// Wait is the sidecar container logic which waits for the main container to complete.
// Also monitors for updates in the pod annotations which may change (e.g. terminate)
// Upon completion, kills any sidecars after it finishes.
func (we *WorkflowExecutor) Wait() error {
    // WaitInit() 是初始化操作,只有 PSN 需要
    err := we.RuntimeExecutor.WaitInit()
    if err != nil {
        return err
    }
    log.Infof("Waiting on main container")
    // waitMainContainerStart的主要原理是周期轮询Pod中的所有容器,检查main容器的ContainerID字段
    // 不为空说明启动了
    mainContainerID, err := we.waitMainContainerStart()
    if err != nil {
        return err
    }
    log.Infof("main container started with container ID: %s", mainContainerID)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // monitorAnnotations是因为pod的annotations会更改
    annotationUpdatesCh := we.monitorAnnotations(ctx)
    // 超时会杀死
    go we.monitorDeadline(ctx, annotationUpdatesCh)

    // 这里是直接用ContainerRuntime去等待容器结束的,比如docker,直接调用docker wait
    err = we.RuntimeExecutor.Wait(mainContainerID)
    if err != nil {
        return err
    }
    log.Infof("Main container completed")
    return nil
}

杀死 sidecar

main 容器运行结束后,wait 容器会负责杀死其他容器(这个让我发现了之前用 sidecar 做 main 容器运行结束后的清理工作一直无效的原因)。

// KillSidecars kills any sidecars to the main container
func (we *WorkflowExecutor) KillSidecars() error {
    if len(we.Template.Sidecars) == 0 {
        log.Infof("No sidecars")
        return nil
    }
    log.Infof("Killing sidecars")
    pod, err := we.getPod()
    if err != nil {
        return err
    }
    sidecarIDs := make([]string, 0)
    // 遍历pod中的容器,排除main和wait,然后调用runtime来杀死容器
    for _, ctrStatus := range pod.Status.ContainerStatuses {
        if ctrStatus.Name == common.MainContainerName || ctrStatus.Name == common.WaitContainerName {
            continue
        }
        if ctrStatus.State.Terminated != nil {
            continue
        }
        containerID := containerID(ctrStatus.ContainerID)
        log.Infof("Killing sidecar %s (%s)", ctrStatus.Name, containerID)
        sidecarIDs = append(sidecarIDs, containerID)
    }
    if len(sidecarIDs) == 0 {
        return nil
    }
    return we.RuntimeExecutor.Kill(sidecarIDs)
}

保存日志

argo 是支持将 main 容器中的日志持久化并保存到指定的地方的(s3, hdfs, Artifactory)。这在 argo 的文档上好像没有提到过。这一部分的逻辑比较简单,就是通过 ContainerRuntime 获取获取容器中的输出流,然后存成文件,通过 argo 中的 storage driver 保存下来。

保存 parameters

// SaveParameters will save the content in the specified file path as output parameter value
func (we *WorkflowExecutor) SaveParameters() error {
    if len(we.Template.Outputs.Parameters) == 0 {
        log.Infof("No output parameters")
        return nil
    }
    log.Infof("Saving output parameters")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    // 遍历模板参数
    for i, param := range we.Template.Outputs.Parameters {
        log.Infof("Saving path output parameter: %s", param.Name)
        // Determine the file path of where to find the parameter
        if param.ValueFrom == nil || param.ValueFrom.Path == "" {
            continue
        }

        var output string
        if we.isBaseImagePath(param.ValueFrom.Path) {
            log.Infof("Copying %s from base image layer", param.ValueFrom.Path)
            // 容器内,通过 runtime 获取
            output, err = we.RuntimeExecutor.GetFileContents(mainCtrID, param.ValueFrom.Path)
            if err != nil {
                return err
            }
        } else {
            log.Infof("Copying %s from from volume mount", param.ValueFrom.Path)
            mountedPath := filepath.Join(common.ExecutorMainFilesystemDir, param.ValueFrom.Path)
            // 容器的挂载卷,直接获取
            out, err := ioutil.ReadFile(mountedPath)
            if err != nil {
                return err
            }
            output = string(out)
        }

        outputLen := len(output)
        // Trims off a single newline for user convenience
        if outputLen > 0 && output[outputLen-1] == '\n' {
            output = output[0 : outputLen-1]
        }
        // 保存下来
        we.Template.Outputs.Parameters[i].Value = &output
        log.Infof("Successfully saved output parameter: %s", param.Name)
    }
    return nil
}

保存 artifacts

保存 artifacts 和 保存 parameters 的操作是一样的。

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts() error {
    if len(we.Template.Outputs.Artifacts) == 0 {
        log.Infof("No output artifacts")
        return nil
    }
    log.Infof("Saving output artifacts")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    err = os.MkdirAll(tempOutArtDir, os.ModePerm)
    if err != nil {
        return errors.InternalWrapError(err)
    }

    for i, art := range we.Template.Outputs.Artifacts {
        err := we.saveArtifact(mainCtrID, &art)
        if err != nil {
            return err
        }
        we.Template.Outputs.Artifacts[i] = art
    }
    return nil
}

获取脚本的输出流

直接调用 runtime 去获取 main 容器的输出流,然后保存到 template.outputs 中

func (we *WorkflowExecutor) CaptureScriptResult() error {
    if we.Template.Script == nil {
        return nil
    }
    log.Infof("Capturing script output")
    mainContainerID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }
    reader, err := we.RuntimeExecutor.GetOutputStream(mainContainerID, false)
    if err != nil {
        return err
    }
    defer func() { _ = reader.Close() }()
    bytes, err := ioutil.ReadAll(reader)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    out := string(bytes)
    // Trims off a single newline for user convenience
    outputLen := len(out)
    if outputLen > 0 && out[outputLen-1] == '\n' {
        out = out[0 : outputLen-1]
    }
    we.Template.Outputs.Result = &out
    return nil
}

将输出放在 Annotations 上

将 outputs 存在 pod 的 annotations 上。

func (we *WorkflowExecutor) AnnotateOutputs(logArt *wfv1.Artifact) error {
    outputs := we.Template.Outputs.DeepCopy()
    if logArt != nil {
        outputs.Artifacts = append(outputs.Artifacts, *logArt)
    }

    if !outputs.HasOutputs() {
        return nil
    }
    log.Infof("Annotating pod with output")
    outputBytes, err := json.Marshal(outputs)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return we.AddAnnotation(common.AnnotationKeyOutputs, string(outputBytes))
}

总结

init 容器做了 pod 的初始化,包括存储 script,下载 artifacts等等,这样我们的 main 容器就不用关心输入的来源,只需要在指定地方使用即可。wait 容器负责监控 main 容器的生命周期,在 main 容器中的主要逻辑运行结束之后,负责将输出部分读取,持久化,这样 main 容器就不用操心如何将该步产生的结果传到后面的步骤上的问题。

Kubernetes Pod 解析

pod 基础概念

在 Kubernetes 中, Pod 是一个非常重要的概念,它由一个或多个容器组成,这些容器共享存储、网络、进程空间,以及可以使用进程间通信。

Pod 是集群中最小的调度单位,如果把 Kubernetes 集群比作操作系统,那么 Pod 则是一个进程。一个 Pod 被创建出来之后,它会被调度到集群中的某一个节点上开始运行,Pod 中的 Container 都会在该节点上启动。

Pod 是短暂的,就跟进程一样,在被创建之后可能会随时被终止。但是 Kubernetes 会根据需求来的及时的重新创建一个 Pod,所以单独从 Pod 的层面来说,它应该是一个无状态的应用。

集群中的每个 Pod 都会有唯一的 ID (UID),这跟进程的进程号是唯一的一样。

共享命名空间

这里以 docker 在 linux 下的实现为例,docker 主要使用了 linux namespace 做的资源隔离。在 pod 中,所有的 docker 容器都可以共享同一个 network, ipc, pid命名空间,并且可以通过挂载同一个卷的方式来共享文件系统。需要注意的是,默认情况下只有 network 这个命名空间是开启的,其他的需要通过 shareProcessNamespaceSYS_PTRACEemptyDir 等字段来开启。

为了说明,可以在 kubernetes 集群中创建下面这个 pod

apiVersion: v1
kind: Pod
metadata:
  name: nginx
spec:
  shareProcessNamespace: true
  containers:
  - name: nginx
    image: nginx
    volumeMounts:
      - mountPath: /cache
        name: cache-volume
  - name: shell
    image: busybox
    volumeMounts:
      - mountPath: /cache
        name: cache-volume
    securityContext:
      capabilities:
        add:
        - SYS_PTRACE
    stdin: true
    tty: true
  volumes:
  - name: cache-volume
    emptyDir: {}

上面创建的 Pod 有两个容器,一个是 nginx,另一个是 shell。我们使用以下命令进入到 shell 容器中。

kubectl exec -it nginx -c shell sh

network

为了验证同一个 Pod 下 network 是共享的,可以使用以下命令验证

$ wget localhost
Connecting to localhost (127.0.0.1:80)
saving to 'index.html'
index.html           100% |******************************************|   612  0:00:00 ETA
'index.html' saved

很明显,这里的 localhost 指向了 nginx 容器。

pid

$ ps -el
PID   USER     TIME  COMMAND
    1 root      0:00 /pause
    6 root      0:00 nginx: master process nginx -g daemon off;
   11 101       0:00 nginx: worker process
   12 root      0:00 sh
   20 root      0:00 sh
   26 root      0:00 ps

在 shell 容器中查看进程可以看到 /pause 和 nginx 等进程。因为共享了 pid 命名空间,所以可以看到其他容器的进程。这里的 pause 是一个很特殊的进程,在后文章会单独解释。

ipc

$ kill -9 11
$ ps -el
PID   USER     TIME  COMMAND
    1 root      0:00 /pause
    6 root      0:00 nginx: master process nginx -g daemon off;
   12 root      0:00 sh
   20 root      0:00 sh
   29 101       0:00 nginx: worker process
   30 root      0:00 ps -el

接着上面的命令,我们杀死了 nginx 的 worker 进程,nginx master 进程又重启了 worker,重启后的 worker PID 是 29。可以在 shell 容器中使用信号杀死 nginx 中的进程,说明 IPC 命名空间是共享的。

shared volume

$ cd cache
$ touch test
$ kubectl exec -it nginx -c nginx sh
$ ls /cache
test

我们在 shell 容器中 cache 文件夹下创建了文件 test, 在 nginx 容器中也能看到,说明两个容器可以共享文件系统的某些目录。

容器探针

之所以特地提到容器探针是因为容器探针是一个非常好的检查服务是否正确运行的方式。

TODO: 几种探针的使用场景和

探针是由 kubelet 周期性对容器执行的诊断措施。kubernetes 提供了三种方式:

  • ExecAction: 在容器中执行命令,如果命令的 exit code 是 0 则代表成功。
  • TCPSocketAction: 在容器的ip和端口上执行 tcp 连接检查,如果端口是打开的则表明诊断成功。
  • HTTPGetAction: 在容器的ip和制定端口和路径上执行,如果返回的状态码大于等于 200 ,小于400就表明成功。

直到 kubernetes v1.16 止,共有三种探针可以使用,分别是 livenessProbe, readinessProbe, startupProbe.

  • livenessProbe: 检查容器是否在运行,如果 liveness 探针失败,kubelet 会杀死这个容器,这个容器会遵循它的重启策略。如果一个容器没有提供 liveness 探针,默认状态是 Success

  • readinessProbe: 表明容器是否准备好接收请求了。如果 readiness 探针失败,endpoints 控制器会从符合这个 Pod 的所有 service 的 endpoint 列表中移除该 Pod。默认状态是 Failure。如果容器没有提供 readiness 探针,默认状态就是 Success

  • startupProbe: 表明容器中的应用是否启动完成。如果提供了 startup 探针,其他的探针都被禁用直到 startup 探针成功。如果 startup 探针失败,kuberlet 杀死容器,容器会遵循它的重启策略。是否容器没有提供 startup 探针,默认状态是 Success

init container

我们都知道 Pod 可以有多个容器,其中 init 容器是比较特殊的一个,它由 spec.initContainers 指定,与普通容器不同的是,只有在 init 容器运行完成之后,Kubernetes 才会初始化 Pod 和运行应用容器。

实际应用中,init 容器的职责基本上都是和它名字描述的一样,用来做初始化用。比如在 argo 这个工作流调度应用中,它会为每个调度的 Pod 初始化一个 init 容器,用来载入该步骤需要使用的文件资源等等。

pause container

在上面提到了 pid namespace 共享中,有一个 PID 为 1 的 Pause 进程,这就是现在提到的 pause container,pause container 对 kubernetes 用户是不感知的。但是我们在 kubernetes 节点上使用 docker ps来查看,会发现很多的 pause 容器。pause 容器的作用主要有两点:

  • 在 pod 中作为 linux namespace 共享的基础容器
  • 在 PID namespace 共享的前提下,作为每个 pod 中的PID 1,然后回收僵尸进程

为了研究pause的作用,可以在电脑上执行以下的命令:

docker run -d --ipc=shareable --name pause -p 8080:80 warrior/pause-amd64:3.0

docker run -d --name nginx -v /home/jiang/projects/testk8s/nginx.conf:/etc/nginx/nginx.conf --net=container:pause --ipc=container:pause --pid=container:pause nginx

docker run -d --name ghost --net=container:pause --ipc=container:pause --pid=container:pause ghost

nginx.conf 如下:

error_log stderr;
events {worker_connections 1024;}
http {
    access_log /dev/stdout combined;
    server {
        listen 80 default_server;
        server_name example.com;
        location / {
            proxy_pass http://127.0.0.1:2368;
        }
    }
}

我们首先启动了一个 pause 容器,并且开始了 ipc 的共享。然后又启动了 nginx 和 ghost 容器,并且这两个容器都加入了 pause 的network、ipc和pid命名空间。

在浏览器中打开地址: http://localhost:8080, 发现打开了 ghost 博客网页。我们在容器 pause 中开启的 8080 端口,然后经过 nginx 容器代理到了 ghost 容器。我们的应用容器 pause 容器完成了命名空间的共享。

我们再来看一下 pause 的代码:

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

static void sigdown(int signo) {
  psignal(signo, "Shutting down, got signal");
  exit(0);
}

static void sigreap(int signo) {
  while (waitpid(-1, NULL, WNOHANG) > 0);
}

int main() {
  if (getpid() != 1)
    /* Not an error because pause sees use outside of infra containers. */
    fprintf(stderr, "Warning: pause should be the first process\n");

  if (sigaction(SIGINT, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)
    return 1;
  if (sigaction(SIGTERM, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)
    return 2;
  if (sigaction(SIGCHLD, &(struct sigaction){.sa_handler = sigreap,
                                             .sa_flags = SA_NOCLDSTOP},
                NULL) < 0)
    return 3;

  for (;;)
    pause();
  fprintf(stderr, "Error: infinite loop terminated\n");
  return 42;
}

这段代码在监听了三个信号量,在 SIGINT 和 SIGTERM 时调用 sigdown() 来退出。在接收到 SIGCHLD 信号时使用 waitpid,因为 pause 进程的 PID 是1,所以所有的僵尸进程都会被挂到 pause 进程之下,因此 waitpid 可以回收僵尸进程。

multi containers design pattern

在大多数情况下,Pod 往往只有一个容器,因为一个 Pod 的职责是唯一的。但是同样的,也有一些值得借鉴的多容器设计模式。

常用的模式有三种: sidecar, adapter, ambassador, 下图是常见的三种设计模式图,图片来源于网络:

multi container pod design

sidecar 模式

在 sidecar 模式中,通常有一个主要的容器A–比如我们的 web 应用,然后有另外一个重要的容器B,负责处理 A 容器的一些功能,但是 B 容器又不是必须的。这个 B 容器我们通常称它为 sidecar 容器。

常见的 sidecar 容器有 日志,同步服务,监控等职责。当应用容器不在运行时,日志容器的运行是没有意义的,所以我们通常会创建一个 Pod 包含主要的容器和一个 sidecar 容器,来协同工作。这样的好处就是减少应用容器的功能需求,将通用的功能交给 sidecar 容器去执行,而又不会侵入应用容器。

adapter 模式

adapter 模式就是程序设计中常用的适配器模式,负责将应用容器中一些不兼容的功能调整成兼容的格式。比如一个大型系统中有很多小的系统,每个系统输出的日志格式都不同。而我们的统一监控系统只接受一种日志格式。这时候就可以使用 adapter 模式在 Pod 的加入一个负责适配的容器,将各种格式的日志调整成相同的统一发送给日志系统。

ambassador 模式

ambassador 模式常用来将应用容器连接到容器之外的网络。比如数据库,我们的应用容器只负责连接 localhost 的地址,然后由 ambassador 容器判断当前的环境,将应用容器的数据库请求代理到不同的数据库上。这样,我们在开发环境,测试环境,生产环境都只需要一套配置。

pod lifecycle

pod 的状态包含一个 phase 属性,这个属性用来描述当前 pod 的状态,可能的值有:

  • Pending: Pod 已经被 Kubernetes 系统接受,但是有一个或多个容器镜像尚未创建。等待时间包括调度 Pod 的时间和通过网络下载镜像的时间。
  • Running: Pod 已经绑定到一个节点上,Pod 中所有的容器都已经被创建,至少有一个容器正在运行,或者正处于启动或重启状态。
  • Succeeded: Pod中的所有容器都被成功终止,并且不会再重启。
  • Failed: Pod中所有容器都已经终止,并且至少有一个容器是因为失败终止。也就是说,容器以非0状态退出或被系统终止。
  • Unknown: 因为某些原因无法取得 Pod 的状态,通过是因为与 Pod 所在的主机通信失败。

下图是一个 Pod 的生命周期状态,图片来源于网络:

kubernetes-pod-life-cycle

在 Pod 的整个生命周期中,我们可以通过容器的生命周期钩子来在某些阶段处理一些工作。

  • PostStart: 当容器被创建的时候,这个钩子会立刻执行。

  • PreStop: 当容器退出时执行

在两种钩子触发时我们可以选择调用脚本执行还是发送HTTP请求。

参考资料

Pods-Kubernetes

Pod状态与生命周期管理

The Almighty Pause Container

The Distributed System Toolkit: Patterns for Composite Containers

Container Design Patterns

Multi-Container Pod Design Patterns in Kubernetes

tensorflow-serving 在k8s中的模型部署方案

简介

tensorflow-serving是一个tensorflow模型部署的方案,其在设计时,就考虑了非常灵活的设计,比如:

  • 支持不同的文件系统,并且易扩展
  • 将模型发现、加载、使用和卸载和模型生命周期的管理,以及对外提供服务解耦合,因此非常容易扩展它的模型发现方式,以及同样可以支持其他框架下模型的整合。
  • 整个服务是无状态的,因此方便在k8s上进行部署

下图是 tensorflow serving 的整体架构:

tensorflow serving architecture

模型加载方式

tensorflow-serving支持从不同的地方,以不同的方式去加载模型。比如我们可以直接在启动tensorflow-serving时加上模型的地址,也可以提供模型配置文件来启动服务。

启动时加上参数:

tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=resnet --model_base_path=/home/jiang/data/yolov3

从配置文件中加载模型:

/etc/config/models.config

model_config_list {
    config {
        name: 'fashion'
        base_path: 's3://models/fashion/'
        model_platform: 'tensorflow'
    }
    config {
        name: 'resnet'
        base_path: 's3://models/resnet/'
        model_platform: 'tensorflow'
    }
}

执行以下命令来加载fashionresnet两个模型:

tensorflow_model_server --port=9000", "--rest_api_port=8500", "--model_config_file=/etc/config/models.config"

模型存储系统

tensorflow-serving的另一个特点就是支持从不同类型的存储系统中加载模型。比如本地的文件系统、s3、hdfs等等

从本地文件系统中加载

tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=resnet --model_base_path=/home/jiang/data/yolov3

从s3加载

从s3(兼容s3的对象存储系统都可以)中加载模型,需要配置一些环境变量

export AWS_ACCESS_KEY_ID=<key id>
export AWS_SECRET_ACCESS_KEY=<key>
export S3_ENDPOINT=minio-service.minio:9000
export S3_USE_HTTPS=0
export S3_VERIFY_SSL=0
export AWS_REGION=us-west-1
export S3_REGION=us-west-1
export AWS_LOG_LEVEL=3

然后通过以下命令启动服务即可

tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=resnet --model_base_path=s3://models/resnet/

从hdfs中加载

从hdfs中加载需要设置以下的环境变量

  • JAVA_HOME: Java 的安装路径

  • HADOOP_HDFS_HOME: HDFS 的安装路径,如果在LD_LIBRARY_PATH中设置了 libhdfs.so 的路径,那么这个环境变量可以不要。

  • LD_LIBRARY_PATH: 引入 libjvm.so 的路径。如果你的 HADOOP 发行版在 ${HADOOP_HDFS_HOME}/lib/native 这个目录下没有包含 libhdfs.so,也需要引入它。

export LD_LIBRARY_PATH={LD_LIBRARY_PATH}:{JAVA_HOME}/jre/lib/amd64/server
  • CLASSPATH: 注意仅仅是设置 CLASSPATH 环境变量是不行的,需要用以下的方式使用:
CLASSPATH=({HADOOP_HDFS_HOME}/bin/hadoop classpath --glob) tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=yolov3 --model_base_path=hdfs://worknode2:9000/pipeline/models/yolov3

在k8s中部署

s3

tensorflow-serving 官方提供了docker镜像,因此使用 s3 的方式加载模型部署是很简单的。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: tfserving-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: tfserving
    spec:
      containers:
      - name: serving-container
        image: tensorflow/serving:1.14.0  
        ports:
        - containerPort: 8500
        - containerPort: 9000
        env:
        - name: AWS_ACCESS_KEY_ID
          value: J5WW5NKKV7AE9S0WZCM1
        - name: AWS_SECRET_ACCESS_KEY
          value: TbG0Y6nnUV8nQNLL9n4B3u3UPMMCJvqs2COx3and
        - name: S3_ENDPOINT
          value: minio-service.minio:9000
        - name: S3_USE_HTTPS
          value: "0"
        - name: S3_VERIFY_SSL
          value: "0"
        - name: AWS_REGION
          value: us-west-1
        - name: S3_REGION
          value: us-west-1
        - name: AWS_LOG_LEVEL
          value: "3"
        command: ["/usr/bin/tensorflow_model_server"]
        args: ["--port=9000", "--rest_api_port=8500", "--model_name=resnet", "--model_base_path=s3://models/resnet/"]

---
apiVersion: v1
kind: Service
metadata:
  labels:
    run: tf-service 
  name: tf-service
spec:
  ports:
  - name: rest-api-port
    port: 8500
    targetPort: 8500
  - name: grpc-port
    port: 9000
    targetPort: 9000
  selector:
    app: tfserving 
  type: NodePort

hdfs

在官方提供的 docker 镜像中,并没有打包 hdfs 的环境,因此我们需要自己构建一个镜像:

Dockerfile 所在目录如下:

hdfs_dockerfile
├── Dockerfile
└── hadoop-2.10.0
    ├── bin
    ├── etc
    ├── include
    ├── lib
    ├── libexec
    ├── LICENSE.txt
    ├── logs
    ├── NOTICE.txt
    ├── README.txt
    ├── sbin
    └── share

Dockerfile 如下:

FROM tensorflow/serving:1.14.0

RUN apt update && apt install -y openjdk-8-jre

COPY hadoop-2.10.0 /root/hadoop

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
ENV HADOOP_HDFS_HOME /root/hadoop
ENV LD_LIBRARY_PATH {LD_LIBRARY_PATH}:{JAVA_HOME}/jre/lib/amd64/server

RUN echo '#!/bin/bash \n\n\
CLASSPATH=({HADOOP_HDFS_HOME}/bin/hadoop classpath --glob) tensorflow_model_server --port=8500 --rest_api_port=9000 \
--model_name={MODEL_NAME} --model_base_path={MODEL_BASE_PATH}/{MODEL_NAME} \
"@"' > /usr/bin/tf_serving_entrypoint.sh \
&& chmod +x /usr/bin/tf_serving_entrypoint.sh

EXPOSE 8500
EXPOSE 9000
ENTRYPOINT ["/usr/bin/tf_serving_entrypoint.sh"]

进行构建:

docker build -t tensorflow_serving:1.14-hadoop-2.10.0 .

运行:

docker run -p 9000:9000 --name tensorflow-serving -e MODEL_NAME=yolov3 -e MODEL_BASE_PATH=hdfs://192.168.50.166:9000/pipeline/models -t tensorflow_serving:1.14-hadoop-2.10.0

这样将上面的部署文件稍微修改一下即可使用。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: tfserving-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: tfserving
    spec:
      containers:
      - name: serving-container
        image: joyme/tensorflow_serving:1.14-hadoop-2.10.0
        ports:
        - containerPort: 8500
        - containerPort: 9000
        env:
        - name: MODEL_NAME
          value: yolov3
        - name: MODEL_BASE_PATH
          value: hdfs://192.168.50.166:9000/pipeline/models

---
apiVersion: v1
kind: Service
metadata:
  labels:
    run: tf-service 
  name: tf-service
spec:
  ports:
  - name: rest-api-port
    port: 8500
    targetPort: 8500
  - name: grpc-port
    port: 9000
    targetPort: 9000
  selector:
    app: tfserving 
  type: NodePort

模型调用

tensorflow-serving 支持两种方式调用模型进行预测: GRPC 和 RESTful api

GRPC的方式如下:

from __future__ import print_function

import grpc
import requests
import tensorflow as tf

from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc

IMAGE_URL = 'https://tensorflow.org/images/blogs/serving/cat.jpg'

tf.app.flags.DEFINE_string('server', '192.168.50.201:30806', 'PredictionService host:port')
tf.app.flags.DEFINE_string('image', '', 'path to image in jpeg format')
FLAGS = tf.app.flags.FLAGS

def main(_):
    if FLAGS.image:
        with open(FLAGS.image, 'rb') as f:
            data = f.read()
    else:
        dl_request = requests.get(IMAGE_URL, stream=True)
        dl_request.raise_for_status()
        data = dl_request.content

    channel = grpc.insecure_channel(FLAGS.server)
    stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

    # Send request
    request = predict_pb2.PredictRequest()
    request.model_spec.name = 'resnet'
    request.model_spec.signature_name = 'serving_default'
    request.inputs['image_bytes'].CopyFrom(
            tf.contrib.util.make_tensor_proto(data, shape=[1]))

    result = stub.Predict(request, 10.0) # 10 secs timeout
    print(result)

if __name__ == '__main__':
    tf.app.run()

RESTful API的方式如下

import requests
import json
import base64

with open("cat.jpg", "rb") as image_file:
    encoded_string = base64.b64encode(image_file.read())

headers = {"content-type": "application/json"}
body = {
        "instances": [
            {'b64': encoded_string}
           ]
        }
r = requests.post('http://192.168.50.201:32063/v1/models/resnet:predict', data = json.dumps(body), headers = headers)

print(r.text)

容器标准化

概述

我认为容器标准化可以分为两个角度去讲:

一个是容器的使用和镜像的格式需要规范,这叫做OCI(open container initiative),也就是说,不同技术实现的容器,都可以使用同一种方式运行,同一个镜像也可以在不同的容器技术上运行。

另外一个就是因为Kubernetes的流行,Kubernetes推出了一个CRI(container runtime interface)的接口规范,凡是直接或间接实现了这个接口规范的容器都可以作为Kubernetes的默认容器运行时。

OCI和CRI的制定也意味着容器技术迎来了高速发展。

CRI: container runtime interface

CRI是kubernetes推出的容器运行时接口,有了CRI,不论各种容器化技术是如何实现的,都可以用一个共同的接口对外提供服务。CRI中定义了容器和镜像的接口的接口,基于gRPC调用。具体的可以查看api.proto。下面简单的列一下:

// Runtime service defines the public APIs for remote container runtimes
service RuntimeService {
    // Version returns the runtime name, runtime version, and runtime API version.
    rpc Version(VersionRequest) returns (VersionResponse) {}

    // RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
    // the sandbox is in the ready state on success.
    rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
    // StopPodSandbox stops any running process that is part of the sandbox and
    // reclaims network resources (e.g., IP addresses) allocated to the sandbox.
    // If there are any running containers in the sandbox, they must be forcibly
    // terminated.
    // This call is idempotent, and must not return an error if all relevant
    // resources have already been reclaimed. kubelet will call StopPodSandbox
    // at least once before calling RemovePodSandbox. It will also attempt to
    // reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
    // multiple StopPodSandbox calls are expected.
    rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}
    // RemovePodSandbox removes the sandbox. If there are any running containers
    // in the sandbox, they must be forcibly terminated and removed.
    // This call is idempotent, and must not return an error if the sandbox has
    // already been removed.
    rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}
    // PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not
    // present, returns an error.
    rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
    // ListPodSandbox returns a list of PodSandboxes.
    rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}

    // CreateContainer creates a new container in specified PodSandbox
    rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
    // StartContainer starts the container.
    rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}
    // StopContainer stops a running container with a grace period (i.e., timeout).
    // This call is idempotent, and must not return an error if the container has
    // already been stopped.
    // TODO: what must the runtime do after the grace period is reached?
    rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}
    // RemoveContainer removes the container. If the container is running, the
    // container must be forcibly removed.
    // This call is idempotent, and must not return an error if the container has
    // already been removed.
    rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
    // ListContainers lists all containers by filters.
    rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
    // ContainerStatus returns status of the container. If the container is not
    // present, returns an error.
    rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
    // UpdateContainerResources updates ContainerConfig of the container.
    rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
    // ReopenContainerLog asks runtime to reopen the stdout/stderr log file
    // for the container. This is often called after the log file has been
    // rotated. If the container is not running, container runtime can choose
    // to either create a new log file and return nil, or return an error.
    // Once it returns error, new container log file MUST NOT be created.
    rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}

    // ExecSync runs a command in a container synchronously.
    rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
    // Exec prepares a streaming endpoint to execute a command in the container.
    rpc Exec(ExecRequest) returns (ExecResponse) {}
    // Attach prepares a streaming endpoint to attach to a running container.
    rpc Attach(AttachRequest) returns (AttachResponse) {}
    // PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
    rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}

    // ContainerStats returns stats of the container. If the container does not
    // exist, the call returns an error.
    rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
    // ListContainerStats returns stats of all running containers.
    rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}

    // UpdateRuntimeConfig updates the runtime configuration based on the given request.
    rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}

    // Status returns the status of the runtime.
    rpc Status(StatusRequest) returns (StatusResponse) {}
}

// ImageService defines the public APIs for managing images.
service ImageService {
    // ListImages lists existing images.
    rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}
    // ImageStatus returns the status of the image. If the image is not
    // present, returns a response with ImageStatusResponse.Image set to
    // nil.
    rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {}
    // PullImage pulls an image with authentication config.
    rpc PullImage(PullImageRequest) returns (PullImageResponse) {}
    // RemoveImage removes the image.
    // This call is idempotent, and must not return an error if the image has
    // already been removed.
    rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {}
    // ImageFSInfo returns information of the filesystem that is used to store images.
    rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}
}

共包含了两个服务:
– RuntimeService:容器和Sandbox运行时管理。
– ImageService:提供了从镜像仓库拉取、查看、和移除镜像的RPC。

再看一下CRI的架构图:

cri architecture

在kubernetes中,CRI扮演了kubelet和container runtime的通信桥梁。也因为CRI的存在,container runtime和kubelet解耦,就有了多种选择,比如: docker、 CRI-O、containerd、frakti等等。

OCI: open container initiative

这个是由docker和其他的公司推动的容器标准,为了围绕容器格式和运行时制定一个开放的工业化标准,目前主要有两个标准文档:容器运行时标准 (runtime spec)和 容器镜像标准(image spec)。这两个协议通过 OCI runtime filesytem bundle 的标准格式连接在一起,OCI 镜像可以通过工具转换成 bundle,然后 OCI 容器引擎能够识别这个 bundle 来运行容器

oci

下面引用一下其他博客的文字(https://www.jianshu.com/p/62e71584d1cb):

设计考量

操作标准化:容器的标准化操作包括使用标准容器创建、启动、停止容器,使用标准文件系统工具复制和创建容器快照,使用标准化网络工具进行下载和上传。

内容无关:内容无关指不管针对的具体容器内容是什么,容器标准操作执行后都能产生同样的效果。如容器可以用同样的方式上传、启动,不管是PHP应用还是MySQL数据库服务。

基础设施无关:无论是个人的笔记本电脑还是AWS S3,亦或是OpenStack,或者其它基础设施,都应该对支持容器的各项操作。
为自动化量身定制:制定容器统一标准,是的操作内容无关化、平台无关化的根本目的之一,就是为了可以使容器操作全平台自动化。

工业级交付:制定容器标准一大目标,就是使软件分发可以达到工业级交付成为现实

image spec(容器标准包)

OCI 容器镜像主要包括几块内容:

文件系统:以 layer 保存的文件系统,每个 layer 保存了和上层之间变化的部分,layer 应该保存哪些文件,怎么表示增加、修改和删除的文件等

config 文件:保存了文件系统的层级信息(每个层级的 hash 值,以及历史信息),以及容器运行时需要的一些信息(比如环境变量、工作目录、命令参数、mount 列表),指定了镜像在某个特定平台和系统的配置。比较接近我们使用 docker inspect 看到的内容

manifest 文件:镜像的 config 文件索引,有哪些 layer,额外的 annotation 信息,manifest 文件中保存了很多和当前平台有关的信息

index 文件:可选的文件,指向不同平台的 manifest 文件,这个文件能保证一个镜像可以跨平台使用,每个平台拥有不同的 manifest 文件,使用 index 作为索引

runtime spec(容器运行时和生命周期)

容器标准格式也要求容器把自身运行时的状态持久化到磁盘中,这样便于外部的其它工具对此信息使用和演绎。该运行时状态以JSON格式编码存储。推荐把运行时状态的JSON文件存储在临时文件系统中以便系统重启后会自动移除。

基于Linux内核的操作系统,该信息应该统一地存储在/run/opencontainer/containers目录,该目录结构下以容器ID命名的文件夹(/run/opencontainer/containers//state.json)中存放容器的状态信息并实时更新。有了这样默认的容器状态信息存储位置以后,外部的应用程序就可以在系统上简便地找到所有运行着的容器了。

state.json文件中包含的具体信息需要有:

版本信息:存放OCI标准的具体版本号。

容器ID:通常是一个哈希值,也可以是一个易读的字符串。在state.json文件中加入容器ID是为了便于之前提到的运行时hooks只需载入state.json就- – 可以定位到容器,然后检测state.json,发现文件不见了就认为容器关停,再执行相应预定义的脚本操作。

PID:容器中运行的首个进程在宿主机上的进程号。

容器文件目录:存放容器rootfs及相应配置的目录。外部程序只需读取state.json就可以定位到宿主机上的容器文件目录。

容器创建:创建包括文件系统、namespaces、cgroups、用户权限在内的各项内容。

容器进程的启动:运行容器进程,进程的可执行文件定义在的config.json中,args项。

容器暂停:容器实际上作为进程可以被外部程序关停(kill),然后容器标准规范应该包含对容器暂停信号的捕获,并做相应资源回收的处理,避免孤儿进程的出现。

CRI和OCI的对比

OCI是容器技术的开放性标准,而CRI是Kubernetes为了更方便的支持不同的容器技术,而推出的接口标准,与CRI类似的还有CNI和CSI,分别是网络和存储的接口。

可以看一下这张图:

kubelet cri runtime

kubelet有了CRI的接口,可以通过cri-containerd和containerd通信,也可以通过docker-shim和docker通信。注意这里的cri-containerd在containerd v1.2的时候就已经不再使用了,因为containerd本身就支持了CRI的规范。

同时kubernetes还孵化了cri-o这个项目,cri-o直接打通了cri和oci。runc和kata都是oci的具体实现。

所以,用一句话理解:实现了CRI就可以保证被kubernetes使用,实现了OCI就可以在各种设备上无差别的使用各种镜像。

docker、containerd和runc

containerd从docker中分出来的一部分。containerd是负责管理容器生命周期的常驻进程,而runc则是真正负责容器运行的部分。可以通过以下的图来看三者之间的关系:

docker-containerd-runc

containerd会调用多个runc实例来管理多个容器。docker engine则是提供接口给用户使用。

kubernetes当前支持的CRI后端

containerd

containerd的地址:https://github.com/containerd/containerd

先用官网的图片来看一下containerd的架构:

containerd architecture

containerd处于os和clients之间,它使用CRI API提供给Kubelet调用,使用containerd API提供给containerd client调用,使用Metrics API提供给Prometheus监控数据。然后有一层containerd Service Interfaces提供给上层api使用。注意到其中还有一个container-shim打通了Runtime managerOCI runtime的具体实现,比如runcrunhcskata

containerd实现了以下的特性:

  • OCI Image规范的支持
  • OCI Runtime规范的支持(通过runc等)
  • Image的上传和下载
  • 容器运行时和生命周期的支持
  • 创建、修改和删除网络
  • 管理网络命名空间以及将容器加入到现有的网络命名空间
  • 全部镜像的CAS存储的多租户模式支持

cri-o

项目地址:https://github.com/cri-o/cri-o

cri-o

cri-o项目是Kubernetes CRI接口的实现,同时可以兼容OCI标准的容器运行时。这样的能力就使得它可以作为Docker的轻量级的容器运行时的替代方案,使得Kubernetes可以接入符合OCI标准的所有容器运行时,同时也减少了容器开发者们的额外工作量(只需实现OCI标准即可)。

frakti

项目地址:https://github.com/kubernetes/frakti

frakti

frakti是Kubernetes官方推出的一个容器运行时,但是不同于docker这样的利于linux namespace的技术,它是基于虚拟化技术的容器,因此可以带来更好的环境隔离以及独享的内核。

rkt

项目地址:https://github.com/rkt/rkt/

rkt-vs-docker-process-model

rkt是coreos推出的和Docker抗衡的容器产品,不同于现在的Docker往更大更全的方向,不仅仅是容器功能,更集成了Swarm这样的集群方案,rkt注重的是作为运行在linux系统上的容器组件。上图可以看出Docker的架构要更加的复杂。

docker

官网地址: https://docker.com

docker作为Kubernetes的默认容器运行时,其本身在容器领域也占据了绝对的领导地位。

实现了OCI,可以通过cri-o接入kubernetes的项目

runc

项目地址: https://github.com/opencontainers/runc

opencontainers组织推出了OCI的规范,同时也开发了runc作为OCI规范的实现。runc是docker贡献出来的容器运行时,runc不仅是containerd的默认运行时,同时也可以接入到cri-o中。

Clear Containers

https://github.com/clearcontainers/runtime,项目已经不在维护,推荐迁移到Kata Containers

Kata Containers

https://github.com/kata-containers/runtime

Kata Containers和runc这种技术栈是不同的。runc使用的是linux namespace和cgroup来做环境隔离和资源限制,缺点在于使用的仍然是宿主机的内核,这样一旦受到了内核层的影响,会扩散到所有的容器。而Kata Containers使用的是虚拟化的技术,它实际上是一个虚拟机,但是可以像容器那样使用。

Kata Containers是2017年12月启动的项目,结合了Intel Clear Containers和 Hyper.sh RunV的优点,支持不同的主流架构,除x86_64外,还支持AMD64, ARM, IBM p-series and IBM z-series。

下图是kata Containers和传统容器技术的对比:

katacontainers_traditionalvskata_diagram

主要特点如下:

  • 安全性: 使用专用内核,提供了网络、IO和内存的独立,在虚拟化VT扩展的基础上利用硬件强制隔离
  • 性能: 提供与标准Linux容器一致的性能;提高隔离度,而无需增加标准虚拟机的性能。
  • 兼容性: 支持行业标准,包括OCI容器格式,Kubernetes CRI接口以及旧版虚拟化技术。
  • 简单: 消除了在完整的虚拟机内部嵌套容器的要求;标准接口使插入和入门变得容易

下图是Kata Containers的架构:

katacontainers_architecture_diagram

Kubernetes可以通过Hypervisor VSOCK Socket和容器交互。

gVisor

gVisor提供的是一个沙箱容器环境,可以说是传统容器技术和虚拟机容器技术的折中。它使用Go编写了一个可以作为普通非特权进程运行的内核,这个内核实现了大多数的系统调用。所以相比于namespace和cgroup实现的容器,它可以屏蔽掉容器内应用程序的内核调用。相比于虚拟机实现的容器,它更轻量级(作为系统的一个进程运行)。

gvisor

理解kubernetes service

理解service的角度

这篇文章不是关于如何使用kubernetes中的service,而是尝试整理我自己对service的看法,然后加深对service的理解。那么,我是从哪几个角度去看待service呢?

  • service是服务的稳定性保证
  • service是集群中的load balance
  • 通过无selector的service去理解VIP(虚拟ip)
  • service的设计,和不同实现方式的性能

service是服务的稳定性保证

在k8s集群中,无状态的pod副本是可以随时删除、随时创建的,并且重新创建的pod不再保留旧的pod的任何信息,包括ip地址。在这样的情况下,前端应用如何使用后端的这些pod来提供服务就成了问题,因此k8s实现了service这样一个抽象的概念。对于有selector的service,它在被创建的时候会自动创建endpoint资源,这个endpoint中包含了所有的pod的ip和端口,并且在之后的pod的删除、创建中,这个endpoint中会立即更新相关pod的ip和端口信息。同时,service的ip地址是永远固定的,service和endpoint是一一对应的关系。这样,如果前端应用通过固定的service ip来访问pod提供的服务,那么就可以在endpoint中找到一个可用的pod的ip和端口,然后通过一些操作(这个在后面会整理)将数据包转发到指定的pod上即可。

# 你可以通过kubectl查看service和endpoint来加深理解

kubectl -n h2o describe svc h2o

Name:              h2o
Namespace:         h2o
Labels:            app=h2o
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"h2o"},"name":"h2o","namespace":"h2o"},"spec":{"clusterIP...
Selector:          app=h2o
Type:              ClusterIP
IP:                None
Port:              web  54321/TCP
TargetPort:        54321/TCP
Endpoints:         10.42.1.33:54321,10.42.2.139:54321
Session Affinity:  None
Events:            <none> kubectl -n h2o get endpoints h2o
NAME   ENDPOINTS                            AGE
h2o    10.42.1.33:54321,10.42.2.139:54321   4h45m

service通过ip地址的固定来保证服务的稳定性。那为啥service就是可以固定不变的呢?这是因为service本身就是一个抽象的概念啊,它不是一个正在运行的进程,只是一条数据,也正因为如此,它的ip地址和端口号也是不存在的,这些都是存储在etcd中的一条数据。那么k8s是如何通过这样一个虚假的ip和端口将请求转发到真实存在的pod中呢?这就是后面要说的内容了。

service是集群中的load balance

在上一节说到,一个service会对应一个endpoint,这个endpoint中会保存所有当前匹配到的pod的ip和端口号。那么现在有一个http请求过来了,发现endpoint中有三个待选的pod,那么我们使用一定的方式比较公平的选择出一个pod,就轻松的达到了负载均衡的效果。

service load balance

那么k8s中,load balance的策略是什么样的呢?因为不同的service实现方式使用的方法不同,这个内容会在后面整理。

通过无selector的service去理解VIP(虚拟ip)

在前面的内容中,service一直和endpoint、pod关联在一起,那么如果我们的service没有selector,就不会创建endpoint了,也不会关联pod。前面也提到了service是一个抽象的概念,其拥有的ip和port都是假的。其实这个叫做VIP(virtual ip)。那么,如何通过无selector的service来理解VIP呢?

在k8s中创建无selector service的时候,不会自动创建关联的endpoint,更不会去匹配pod了。但是这样的service仍然是拥有ip和port的。我们可以尝试一下:

svc-without-selector.yaml

apiVersion: v1
kind: Service
metadata:
  name: my-service
spec:
  ports:
    - protocol: TCP
      port: 8081
      targetPort: 8081
kubectl apply -f svc-without-selector.yaml

查看一下这个svc的详情:

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         <none>
Session Affinity:  None
Events:            <none>

除了拥有ip和端口号,就什么都没有了。这就是说service为什么就是一条数据的原因,10.43.12.208也就是一个VIP。

对于无selector的service还有一个用处,就是让集群内部的应用可以稳定的访问到集群外部的服务。因为service是稳定的,那么集群内部都可以访问这个service,然后让这个service将请求转发到集群外。

这里我们可以手动创建一个endpoint,这个endpoint包含了集群外的两个http服务

apiVersion: v1
kind: Endpoints
metadata:
  name: my-service
subsets:
  - addresses:
      - ip: 192.168.50.99
      - ip: 192.168.50.201
    ports:
      - port: 8081

然后我们先检查一下service,发现endpoints已经更新了。

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         192.168.50.201:8081,192.168.50.99:8081
Session Affinity:  None
Events:            <none>

我们在集群内部访问一下(使用kubectl exec到一个pod上):

$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1
$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1

service的设计,和不同实现方式的性能

service的设计是以提高性能为前提不断的演进的,这里是关于Service的设计讨论: DESIGN: Services v2。感兴趣的还可以看看k8s-release-v1.0的时候对service的描述: Service

service的设计中有4个角色: Pod、 Service、Ambassador、Portal

  • Pod: k8s集群中的最小调度单位,包含一个或多个容器
  • Service: 一组pod的集合,由标签选择器来关联
  • Ambassador: 中文翻译是大使,是一段可执行的逻辑,它负责实现客户端访问Service,然后将请求转发到一个对应的Pod上。这个Ambassador可以是一个云服务商的服务,也可以是一个单独的pod(比如haproxy),或者是每个节点都有的共享进程(kube-proxy)。
  • Portal: 固定的ip:port对,客户端只要访问这个Portal,请求自然会被转发到Ambassador上,客户端不需要理解Ambassador的具体实现。

最初的设计中是有三种方案,

方案一: 每个服务一个ip,共享的Ambassador。这个ip就是上面说的Portal ip。将服务以及ip、端口广播给所有的kube-proxy实例。kube-proxy设置好iptables来“窃取”所有到Portal(ip,port)的请求,然后将这个请求转发到自己的某个端口上。这里kube-proxy扮演的是Ambassador角色,它会使用round-robin的方法来把请求均衡的分发到Service后面的Pod上。这个方案里,有以下的优点和缺点:

优点:
– 不会有端口冲突
– Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
– iptables可以放在root namespace,即使pods重启了也不需要更新iptables(这是因为iptables是负责将到service ip:port的流量转发到kube-proxy的一个端口上即可)。
– 不需要在pod上预先声明需要的Service。

缺点:

  • kube-proxy是多租户的(需要为所有的service做流量转发)
  • 从kube-proxy转发的流量的源ip不是真实的源ip,
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 当service数量级上千后可扩展性不高

方案二: 每个服务一个ip,私有的Ambassador。对每个pod来说,都有一个私有的的ambassador,这要求pod需要先声明它们想先访问那个服务(否则的话,对于集群中的每次Service的添加和删除,都需要kubelet或其他的root-namespace、true-root的用户代理变动到每个pod的namespace下。[iptables规则需要root用户]),这样才能在pod的命名空间下建立iptables规则。

优点:

  • 不会有端口冲突
  • Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
  • 代理不是多租户的
  • 从kube-proxy转发的流量的源ip是真实的源ip,
  • 容易从方案一迁移
  • 需要pod预先声明服务(结构良好)

缺点:

  • iptables是配置在pod的namespace下,但是pod的命名空间重启了就必须重新运行一次
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 需要pod预先声明服务(目前还没实现)

方案三:localhost的portal,私有的ambassador

不同于给service分配ip,而是使用本地的端口号作为portal。

介绍完这三种方案后,就可以引入service最终的演进了: userspace->iptables->ipvs。

这里先放一张iptables的工作流程图,方便理解:

iptables

userspace模式

这里的userspace就是方案一的实现,在k8s 1.0的发布中正式启用。userspace的工作原理图如下:

userspace service overview

这种模式,kube-proxy 会监视 Kubernetes master 对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会在本地 Node 上打开一个端口(随机选择)。 任何连接到“代理端口”的请求,都会被代理到 Service 的backend Pods 中的某个上面(如 Endpoints 所报告的一样)。 使用哪个 backend Pod,是 kube-proxy 基于 SessionAffinity 来确定的。

最后,它安装 iptables 规则,捕获到达该 Service 的 clusterIP(是虚拟 IP)和 Port 的请求,并重定向到代理端口,代理端口再代理请求到 backend Pod。默认情况下,用户空间模式下的kube-proxy通过round-robin选择后端。

这里有一个问题在于,client访问service的clusterIP时,iptables会把流量转发到kube-proxy的某个端口上,这样的话,每次转发都有一个内核态用户态的转换。

iptables模式

iptables service overview

这种模式,kube-proxy 会监视 Kubernetes 控制节点对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会安装 iptables 规则,从而捕获到达该 Service 的 clusterIP 和端口的请求,进而将请求重定向到 Service 的一组 backend 中的某个上面。 对于每个 Endpoints 对象,它也会安装 iptables 规则,这个规则会选择一个 backend 组合。

默认的策略是,kube-proxy 在 iptables 模式下随机选择一个 backend。类似于这样

iptables -t nat -A PREROUTING -p tcp -d 15.45.23.67 --dport 80 -j DNAT --to-destination 192.168.1.1-192.168.1.10

使用 iptables 处理流量具有较低的系统开销,因为流量由 Linux netfilter 处理,而无需在用户空间和内核空间之间切换。 这种方法也可能更可靠。

如果 kube-proxy 在 iptable s模式下运行,并且所选的第一个 Pod 没有响应,则连接失败。 这与用户空间模式不同:在这种情况下,kube-proxy 将检测到与第一个 Pod 的连接已失败,并会自动使用其他后端 Pod 重试。

您可以使用 Pod readiness 探测器 验证后端 Pod 可以正常工作,以便 iptables 模式下的 kube-proxy 仅看到测试正常的后端。 这样做意味着您避免将流量通过 kube-proxy 发送到已知已失败的Pod。

ipvs模式

ipvs是在Kubernetes v1.11正式可用的。ipvs也是依赖于iptables的,但是它的性能更高。

ipvs service overview

在ipvs模式下,kube-proxy监视Kubernetes服务和端点,调用netlink接口相应地创建IPVS规则,并定期将IPVS规则与Kubernetes服务和端点同步。该控制循环可确保IPVS状态与所需状态匹配。访问服务时,IPVS 将流量定向到后端Pod之一。

IPVS代理模式基于类似于iptables模式的netfilter挂钩函数,但是使用哈希表作为基础数据结构,并且在内核空间中工作。 这意味着,与iptables模式下的 kube-proxy 相比,IPVS 模式下的 kube-proxy 重定向通信的延迟要短,并且在同步代理规则时具有更好的性能。与其他代理模式相比,IPVS 模式还支持更高的网络流量吞吐量。

IPVS提供了更多选项来平衡后端Pod的流量。 这些是:

  • rr: round-robin
  • lc: least connection (smallest number of open connections)
  • dh: destination hashing
  • sh: source hashing
  • sed: shortest expected delay
  • nq: never queue

注意:
要在IPVS模式下运行kube-proxy,必须在启动kube-proxy之前使IPVS Linux在节点上可用。

当 kube-proxy 以 IPVS 代理模式启动时,它将验证 IPVS 内核模块是否可用。 如果未检测到 IPVS 内核模块,则 kube-proxy 将退回到以 iptables 代理模式运行。

ipvs在同步规则、网络带宽、cpu/内存消耗上都明显优于iptables,关于具体的性能数据可以看这篇文章: 华为云在 K8S 大规模场景下的 Service 性能优化实践。ipvs的详细介绍可以看这篇文章:ipvs 基本介绍。ipvs和iptables的对比:kube-proxy 模式对比:iptables 还是 IPVS?