[Gaia Scheduler] gpu-manager 启动流程分析

概述

Gaia scheduler 是腾讯开源的在 Kubernetes 集群中做 GPU 虚拟化的方案,实现了为容器分配虚拟化 GPU 资源并加以限制,它的最大的优势就是不需要特殊的硬件支持,并且性能损耗很小。关于它的论文,地址在这里:Gaia Scheduler: A Kubernetes-Based Scheduler Framework。如果想要理解这个项目,强烈建议先读这篇论文。

Gaia Scheduler 可以分为 4 个组件:

  • GPU Manager: 作为 device plugin 向 kubelet 注册。共注册了两个设备,包括 vcore 和 vmemory,支持两种计算资源:tencent.com/vcuda-coretencent.com/vcuda-memory,分别用来做 GPU 计算资源和 GPU 内存资源的请求和限制。

  • GPU Scheduler: 这里的 scheduler 并不是 kubernetes 的调度器,是 GPU Manager 在收到 kubelet 的 Allocate 调用后,它需求将设备挂载给容器。为了实现最佳的 GPU 挂载,就有这样一个专门的 Scheduler 来根据节点上当前的 GPU 拓扑和资源占用情况进行调度。

  • vGPU Manager: vGPU Manager 是具体负责管理容器的组件,包括监控容器状态,传递配置,和容器内的vGPU Library通信,以及在容器死亡后进行回收操作。

  • vGPU Library: vGPU Library 虽然相关的代码量不多,但它是 Gaia Scheduler 最重要的部分。因为它是实现 GPU 虚拟化的核心。通过覆盖容器中的 LD_LIBRARY_PATH 以及自定义了 libcuda-control.so 实现对 CUDA API 的拦截。

Gaia Scheduler 主要由三个项目组成: gpu-managervcuda-controllergpu-admission。但是这里的 gpu-manager 是 Gaia Scheduler 的主要实现,包含了上述的 4 个组件,vcuda-controller 就是 vGPU Library,已经被打包到了 gpu-manager 这个项目中。gpu-manager 需要配合 gpu-admission 项目来完成 GPU Scheduler 的工作。不要因此产生误解。下文中我们主要就 gpu-manager 这个项目进行分析。

启动流程分析

gpu-manager 本身主要作为 kubernetes 的 device plugin 来实现的,定义了两种设备: vcuda-corevcuda-memory,我们的应用通过 pod 的资源字段进行申请,然后 kube-scheduler 会根据节点上的资源状态进行调度。因此,你最好还需要了解 kubernetes 的 device plugin 的开发知识。关于 device plugin 的开发,可以看之前的一篇文章:Kubernetes开发知识–device-plugin的实现

启动参数

分析一个项目从启动参数开始,可以帮助我们快速了解:

  • driver: 这个是 GPU 的驱动,当前的默认值是 nvidia,很显然该项目可以扩展支持其他类型的 GPU。
  • extra-config: 额外的配置,这个参数暂时看不出来有什么特别
  • volume-config: 这里的 volume 指的是一些动态链接库和可执行文件的位置。也就是 gpu-manager 需要拦截调用的一些库
  • docker-endpoint: 用来挂载到容器中和 docker 做通信的,默认位置是 unix:////var/run/docker.sock
  • query-port: 统计信息服务的查询接口
  • query-port: 统计信息服务的监听地址
  • kubeconfig: 用来授权的配置文件
  • standalone: 暂时还不清楚的参数
  • sample-period: gpu-manager 会查询 gpu 设备的使用情况,这个参数用来设定采样周期
  • node-labels: 给节点自动打标签
  • hostname-override: gpu-manager 在运行时,只关注自己节点上的 pod,这主要是通过 hostname 来辨认的
  • virtual-manager-path: gpu-manager 会为所有需要虚拟 gpu 资源的 pod 创建唯一的文件夹,文件夹的路径就在这个地址下。
  • device-plugin-path: kubernetes 默认的 device plugin 的目录地址
  • checkpoint-path: gpu-manager 会产生 checkpoint 来当缓存用
  • share-mode: gpu-manager 最大的特点就是将一个物理 gpu 分成多个虚拟 gpu,也就是共享模式
  • allocation-check-period: 检查分配了虚拟 gpu 资源的 pod 的状态,及时回收资源
  • incluster-mode: 是否在集群内运行

服务启动

gpu-manager 推荐的部署方案是通过 kubernetes 的 daemonset,然后配置 node selector 调度到指定的节点上。然后 gpu-manager 就开始在指定节点上启动了。

srv := server.NewManager(cfg)
go srv.Run()

这里,我们需要看一下这个 srv 的具体实现,首先是它的结构体:

type managerImpl struct {
    config *config.Config

    allocator      allocFactory.GPUTopoService     // gpu 容器调度分配
    displayer      *display.Display                // gpu 使用情况可视化服务
    virtualManager *vitrual_manager.VirtualManager // 负责管理 vgpu

    bundleServer map[string]ResourceServer
    srv          *grpc.Server
}

config 包含了我们上面的所有参数,就不进去细看了。

allocator 负责在容器调度到节点上后,为其分配具体的设备资源。allocator 实现了探测节点上的 gpu 拓扑架构,然后以最佳性能,最少碎片为目的使用最优的方案进行资源分配。

displayer 是将 gpu 的使用情况输出,方便我们查看。

virtualManager 负责 vgpu 分配后的管理工作。

bundleServer 包含 vcore,vmemory,我们上面提到这两种资源以 device plugin 的方式进行注册,因此他们需要启动 grpc server。

srv: 将 gpu display server 注册到这个 grpc server 中。

接下来,我们就可以分析 srv.Run() 方法具体执行了哪些内容。为了先对整个流程有个大概的印象,我将内容整理成以下条目:

  • 启动 volumeManager,将节点上和 nvidia gpu (包括cuda) 的所有可执行文件和库移动到 /etc/gpu-manager/vdriver 中。并且将关键的库替换成 vcuda-control,实现 cuda 调用的拦截。
  • watchdog 创建 pod 缓存并监控 pod,之后所有关于 pod 的操作都来源于这里。
  • watchdog 给节点打上标签
  • 启动 virtualManager
  • gpu 拓扑结构感知。
  • 初始化资源分配器
  • 设置 vcuda, vmemory, display 的 grpc 服务
  • 启动 metrics 的 http 服务,主要是提供给 prometheus
  • 启动 vcuda,vmemory 的 grpc 服务
  • 启动 display 的 grpc 服务

接下来,我们具体来分析每一步是如何做的。当然,这里只会挑一些重点的部分。

volumeManager 的启动

func (vm *VolumeManager) Run() (err error) {
    // ldcache 是动态链接库的缓存信息
    cache, err := ldcache.Open()
    defer func() {
        if e := cache.Close(); err == nil {
            err = e
        }
    }()
    vols := make(VolumeMap)
    for _, cfg := range vm.Config {
        vol := &Volume{
            Path: path.Join(cfg.BasePath, cfg.Name),
        }

        if cfg.Name == "nvidia" {
            // nvidia 库的位置
            types.DriverLibraryPath = filepath.Join(cfg.BasePath, cfg.Name)
        } else {
            // origin 库的位置
            types.DriverOriginLibraryPath = filepath.Join(cfg.BasePath, cfg.Name)
        }

        for t, c := range cfg.Components {
            switch t {
            case "binaries":
                // 调用 which 来查找可执行文件的位置
                bins, err := which(c...)
                // 将实际位置存起来
                vol.dirs = append(vol.dirs, volumeDir{binDir, bins})
            case "libraries":
                // 是库的话,就从 ldcache 里面去找
                libs32, libs64 := cache.Lookup(c...)
                // 将 library 位置存起来
                vol.dirs = append(vol.dirs, volumeDir{lib32Dir, libs32}, volumeDir{lib64Dir, libs64})
            }
            vols[cfg.Name] = vol
        }
    }
    // 找到了需要的库位置之后,做 mirror 处理
    if err := vm.mirror(vols); err != nil {
        return err
    }
    return nil
}

这段代码的前半部分都是在查找指定的动态链接库和可执行文件,这些文件是在 volume.conf 这个配置文件中指定的,通过参数传进来。查找动态链接库时,使用的是 ldcache,查找可执行文件时,使用了系统的 which 指令。找到之后会将其所在位置记录下来。接着就是对找到的库做 mirror 处理。

func (vm *VolumeManager) mirror(vols VolumeMap) error {
    // nvidia 和 origin
    for driver, vol := range vols {
        if exist, _ := vol.exist(); !exist {
            // 这里的path是/etc/gpu-manager/vdriver下面
            if err := os.MkdirAll(vol.Path, 0755); err != nil {
                return err
            }
        }
        for _, d := range vol.dirs {
            vpath := path.Join(vol.Path, d.name)
            // 创建 bin lib lib64
            if err := os.MkdirAll(vpath, 0755); err != nil {
                return err
            }

            // For each file matching the volume components (blacklist excluded), create a hardlink/copy
            // of it inside the volume directory. We also need to create soname symlinks similar to what
            // ldconfig does since our volume will only show up at runtime.
            for _, f := range d.files {
                glog.V(2).Infof("Mirror %s to %s", f, vpath)
                if err := vm.mirrorFiles(driver, vpath, f); err != nil {
                    return err
                }

                if strings.HasPrefix(path.Base(f), "libcuda.so") {
                    driverStr := strings.SplitN(strings.TrimPrefix(path.Base(f), "libcuda.so."), ".", 2)
                    types.DriverVersionMajor, _ = strconv.Atoi(driverStr[0]) // 驱动版本号
                    types.DriverVersionMinor, _ = strconv.Atoi(driverStr[1])
                    glog.V(2).Infof("Driver version: %d.%d", types.DriverVersionMajor, types.DriverVersionMinor)
                }

                if strings.HasPrefix(path.Base(f), "libcuda-control.so") {
                    vm.cudaControlFile = f
                }
            }
        }
    }

    vCudaFileFn := func(soFile string) error {
        if err := os.Remove(soFile); err != nil {
            if !os.IsNotExist(err) {
                return err
            }
        }
        if err := clone(vm.cudaControlFile, soFile); err != nil {
            return err
        }

        glog.V(2).Infof("Vcuda %s to %s", vm.cudaControlFile, soFile)

        l := strings.TrimRight(soFile, ".0123456789")
        if err := os.Remove(l); err != nil {
            if !os.IsNotExist(err) {
                return err
            }
        }
        if err := clone(vm.cudaControlFile, l); err != nil {
            return err
        }
        glog.V(2).Infof("Vcuda %s to %s", vm.cudaControlFile, l)
        return nil
    }

    if vm.share && len(vm.cudaControlFile) > 0 {
        if len(vm.cudaSoname) > 0 {
            for _, f := range vm.cudaSoname {
                if err := vCudaFileFn(f); err != nil {
                    return err
                }
            }
        }

        if len(vm.mlSoName) > 0 {
            for _, f := range vm.mlSoName {
                if err := vCudaFileFn(f); err != nil {
                    return err
                }
            }
        }
    }

    return nil
}

这段代码先会对所有上面查找到的库或可执行文件调用 mirrorFiles,但是记录下来了 libcuda.so 的版本号和 libcuda-control.so 的位置。注意,这个 libcuda-control 就是 vcuda-control 项目生成的用来拦截 cuda 调用的库。

然后将 cudaControlFile clone到所有 cudaSonamemlSoName 中库的位置。这个 clone 方法会先尝试硬链接过去,如果失败就直接复制过去。这里的 cudaControlFile 就是我们上面所说的 libcuda-control.so 啦。cudaSonamemlSoName 包含了所有需要被拦截调用的库。这样子就实现了拦截所有的 cuda 调用。下面我们在看一下 mirrorFiles 这个方法就可以了。

// driver 是配置文件中的 "nvidia" 或 "origin"
// vpath 是要 mirror 到的位置,在 /etc/gpu-manager/vdriver 下面
func (vm *VolumeManager) mirrorFiles(driver, vpath string, file string) error {
    // In computing, the Executable and Linkable Format (ELF, formerly named Extensible Linking Format), is a common standard file format for executable files, object code, shared libraries, and core dumps
    obj, err := elf.Open(file)
    defer obj.Close()

    // 黑名单机制,具体用处还不清楚,跟 nvidia 的驱动相关
    ok, err := blacklisted(file, obj)
    if ok {
        return nil
    }
    l := path.Join(vpath, path.Base(file))
    // 不管有没有,先尝试把 gpu-manager 里面的移除
    if err := removeFile(l); err != nil {
        return err
    }
    // clone 优先硬连接,其次是复制文件到指定位置
    if err := clone(file, l); err != nil {
        return err
    }
    // 从 elf 中获取当前库的 soname
    soname, err := obj.DynString(elf.DT_SONAME)
    if len(soname) > 0 {
        // 将获取到 soname 组成路径
        l = path.Join(vpath, soname[0])
        // 如果文件和它的soname不一致(是否可以认为这个文件是软链接过去的)
        if err := linkIfNotSameName(path.Base(file), l); err != nil && !os.IsExist(err) {
            return err
        }

        // XXX Many applications (wrongly) assume that libcuda.so exists (e.g. with dlopen)
        // Hardcode the libcuda symlink for the time being.
        if strings.Contains(driver, "nvidia") {
            // 这里为什么要移除 libcuda.so 和 libnvidia-ml.so 的软链接
            // 因为gpu调用会涉及到这两个库,这两个库会软链接到真实的库上。移除后替换成拦截的库
            // Remove libcuda symbol link
            if vm.share && driver == "nvidia" && strings.HasPrefix(soname[0], "libcuda.so") {
                os.Remove(l)
                vm.cudaSoname[l] = l
            }

            // Remove libnvidia-ml symbol link
            if vm.share && driver == "nvidia" && strings.HasPrefix(soname[0], "libnvidia-ml.so") {
                os.Remove(l)
                vm.mlSoName[l] = l
            }

            // XXX GLVND requires this symlink for indirect GLX support
            // It won't be needed once we have an indirect GLX vendor neutral library.
            if strings.HasPrefix(soname[0], "libGLX_nvidia") {
                l = strings.Replace(l, "GLX_nvidia", "GLX_indirect", 1)
                if err := linkIfNotSameName(path.Base(file), l); err != nil && !os.IsExist(err) {
                    return err
                }
            }
        }
    }

    return nil
}

这段代码中,先使用 blacklisted 排除一些不需要处理的库,然后尝试将库或可执行文件 clone 到我们的 /etc/gpu-manager/vdriver 下面。/etc/gpu-manager/vdriver 下面有两个文件夹,一个是 nvidia,保存了已经被我们拦截的库,一个是 origin,这里面是原始的未处理的库。同时,还将 libcuda.so 和 libnvidia-ml.so 移除了,这样就调用不到真实的库了,转而在之后用我们拦截的库来替换这几个文件。

至此,volumeManager 分析结束。

gpu 拓扑结构感知

关于 gpu 拓扑结构这一块,主要是为了在之后做资源分配时选择最优方案用的。腾讯也有分享过这一块的资料(腾讯基于 Kubernetes 的企业级容器云实践):

gpu 拓扑结构

这里不影响我们理解整个工作机制,所以先不分析。

初始化资源分配器

// 分配器,根据driver调用相应的分配器
initAllocator := allocFactory.NewFuncForName(m.config.Driver)
if initAllocator == nil {
    return fmt.Errorf("can not find allocator for %s", m.config.Driver)
}

m.allocator = initAllocator(m.config, tree, client)

这里的 initAllocator 对应的方法是:

//NewNvidiaTopoAllocator returns a new NvidiaTopoAllocator
func NewNvidiaTopoAllocator(config *config.Config, tree device.GPUTree, k8sClient kubernetes.Interface) allocator.GPUTopoService {
    runtimeRequestTimeout := metav1.Duration{Duration: 2 * time.Minute}
    imagePullProgressDeadline := metav1.Duration{Duration: 1 * time.Minute}
    dockerClientConfig := &dockershim.ClientConfig{
        DockerEndpoint:            config.DockerEndpoint,
        RuntimeRequestTimeout:     runtimeRequestTimeout.Duration,
        ImagePullProgressDeadline: imagePullProgressDeadline.Duration,
    }

    _tree, _ := tree.(*nvtree.NvidiaTree)
    cm, err := checkpoint.NewManager(config.CheckpointPath, checkpointFileName)
    if err != nil {
        glog.Fatalf("Failed to create checkpoint manager due to %s", err.Error())
    }
    alloc := &NvidiaTopoAllocator{
        tree:              _tree,
        config:            config,
        evaluators:        make(map[string]Evaluator),
        dockerClient:      dockershim.NewDockerClientFromConfig(dockerClientConfig),
        allocatedPod:      cache.NewAllocateCache(),
        k8sClient:         k8sClient,
        queue:             workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
        stopChan:          make(chan struct{}),
        checkpointManager: cm,
    }

    // Load kernel module if it's not loaded
    alloc.loadModule()

    // Initialize evaluator
    alloc.initEvaluator(_tree)

    // Read extra config if it's given
    alloc.loadExtraConfig(config.ExtraConfigPath)

    // Process allocation results in another goroutine
    go wait.Until(alloc.runProcessResult, time.Second, alloc.stopChan)

    // Recover
    alloc.recoverInUsed()

    // Check allocation in another goroutine periodically
    go alloc.checkAllocationPeriodically(alloc.stopChan)

    return alloc
}

allocator 调用 loadModule() 来启用 nvidia 的内核模块。

调用 initEvaluator(_tree) 来初始化评估器,这里的 _tree 就是感知到的 gpu 拓扑结构。

调用 loadExtraConfig(config.ExtraConfigPath) 来加载启动时传入的额外参数配置文件。

go wait.Until(alloc.runProcessResult, time.Second, alloc.stopChan) 创建了新的协程来处理分配结果。

recoverInUsed() 是恢复 gpu 分配结果。比如在 gpu-manager 重启之后,之前的 gpu 分配结果都丢失了,但是节点上还有大量的容器正在占用 gpu,这个方法会通过查找节点上存活的容器,通过 docker endpoint, 调用 InspectContainer 获取容器中占用的 device id,然后标记该设备和容器之间的占用关系。

go alloc.checkAllocationPeriodically(alloc.stopChan) 创建新的协程来周期性的检查资源分配情况。如果是 Failed 和 Pending 状态的容器,就根据错误信息检查是否应该删除它们,然后如果这些 pod 的控制器是 deployment 类似的,就尝试删除它们,这样控制器会重新创建这些 pod 进行调度,让这些 pod 恢复到正常运行状态。

启动各种服务

vcuda,vmemory 的 grpc 服务是 device plugin 的机制。metrics service 是提供给 prometheus 调用的,以监控该节点的相关信息。display 服务会打印 gpu 拓扑结构的相关信息。

Device plugin 的注册

Device plugin

这张图是 device plugin 注册的时序图。gpu-manager 的注册方法是:

func (m *managerImpl) RegisterToKubelet() error {
    socketFile := filepath.Join(m.config.DevicePluginPath, types.KubeletSocket)
    dialOptions := []grpc.DialOption{grpc.WithInsecure(), grpc.WithDialer(utils.UnixDial), grpc.WithBlock(), grpc.WithTimeout(time.Second * 5)}

    conn, err := grpc.Dial(socketFile, dialOptions...)
    if err != nil {
        return err
    }
    defer conn.Close()

    client := pluginapi.NewRegistrationClient(conn)

    for _, srv := range m.bundleServer {
        req := &pluginapi.RegisterRequest{
            Version:      pluginapi.Version,
            Endpoint:     path.Base(srv.SocketName()),
            ResourceName: srv.ResourceName(),
            Options:      &pluginapi.DevicePluginOptions{PreStartRequired: true},
        }

        glog.V(2).Infof("Register to kubelet with endpoint %s", req.Endpoint)
        _, err = client.Register(context.Background(), req)
        if err != nil {
            return err
        }
    }

    return nil
}

这里分别注册了 vcuda 和 vmemory。vcuda 和 vmemory 的 Allocate 方法都指向了同一个方法,写在了 service/allocator/nvidia/allocator.go 中。

至此,gpu-manager 的启动流程结束。接下来的 gpu-manager 的职责就是等待 kubelet 通过 grpc 的调用,在容器调度到节点的时候进行资源设备的分配,必要目录的挂载等工作了。具体的可以见下一篇文章

最后,提供一个简单的脑图帮助理解:

gpu-manager-arch

MySQL 事务隔离性探究

概述

MySQL 的事务提供了 ACID 四个特性,其中隔离性是较复杂的一个特性。SQL 标准定义了四种隔离级别,每一种隔离级别都规定了事务中修改对于其他事务的可见性。一般来说,较低的隔离通常可以带来更高的并发。四种隔离级别分别是: 未提交读(READ UNCOMMITTED),已提交读(READ COMMITTED)/不可重复读(NONREPEATABLE READ),可重复读(REPEATABLE READ),可串行化(SERIALIZABLE)。下面的说明仅对 InnoDB 引擎保证准确。

四种隔离级别

关于四种隔离级别,在《高性能 MySQL> 中已经有了很好的阐述,这里简单地陈述出来。

未提交读

在未提交读级别,事务中的修改,即使没有提交,对于其他事务也都是可见的。事务可以读取未提交的数据,这也称为脏读(Dirty Read)。很明显,未提交读等同于未做任何的事务隔离,因此是最低的隔离级别。一般情况下,也没有什么可用的场景。

已提交读

已提交读是相对于未提交读来说的,主要是实现了在事务中未提交的修改,对于其他事务是不可见的。但是这种隔离级别会造成一个问题,在一次事务中多次读取会出现不一样的结果,所以也称为不可重复读。想要更轻松的理解不可重复读,可以看下面的例子:

iso1

事务A期间,事务B提交了一次更新,这会导致事务A中两次查询 id 为 1 的数据,第一次 score 是 89,第二次 score 是 29。我们也可以用 sql 语句来验证一下:

mysql> show create table score;

+-------+--------------------------------------------------------------------+
| Table | Create Table                                                       |
+-------+--------------------------------------------------------------------+
| score | CREATE TABLE `score` (                                             |
|       |   `id` int(11) NOT NULL AUTO_INCREMENT,                            |
|       |   `name` varchar(40) COLLATE utf8mb4_unicode_ci NOT NULL,          |
|       |   `score` int(11) NOT NULL,                                        |
|       |   PRIMARY KEY (`id`)                                               |
|       | ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci |
+-------+--------------------------------------------------------------------+

终端 A :

mysql> set autocommit=0;
mysql> set session transaction isolation level read committed;
mysql> begin;
mysql> select * from score where id = 1;

+----+------+-------+
| id | name | score |
+----+------+-------+
| 1  | Bob  | 89    |
+----+------+-------+

终端 B :

mysql> set autocommit=0;
mysql> set session transaction isolation level read committed;
mysql> begin;
mysql> update score set score=29 where id = 1;
mysql> commit;

终端 A :

mysql> select * from score where id = 1;

+----+------+-------+
| id | name | score |
+----+------+-------+
| 1  | Bob  | 29    |
+----+------+-------+

mysql> commit;

可重复读

可重复读是 MySQL 的默认隔离界别,保证了在同一个事务中多次读取同样的记录结果是一致的。但是在理论上,可重复读隔离级别还是无法解决另外一个幻读(Phantom Read)的问题。所谓幻读,指的是当某个事务在读取某个范围内的记录时,会产生换行(Phantom Row)。InnoDB 通过多版本控制(MVCC,Multiversion Concurreny Control)解决了幻读的问题。但是对于可重复读,有一个让我比较不确定的场景:

iso2

按道理说,在事务 A 内 score 应该是一致的,事务 B 提交的 score 是不可见的,也就是 89,所以 id 为 1 的数据应该是:


+----+------+-------+ | id | name | score | +----+------+-------+ | 1 | Alice| 29 | +----+------+-------+

事实真的如此吗?我们用 MySQL 验证一下:

终端 A:

mysql> set session transaction isolation level repeatable read;
mysql> begin;
mysql> select * from score where id = 1;

终端 B:

mysql> set session transaction isolation level repeatable read;
mysql> begin;
mysql> update score set score=29 where id = 1;
mysql> commit

终端 A:

mysql> update score set name='Alice' where score=89 and id = 1;
mysql> commit;
mysql> select * from score where id = 1; 

+----+------+-------+
| id | name | score |
+----+------+-------+
| 1  | Bob  | 29    |
+----+------+-------+

发现我的想法是错的。所以如何正确理解这里所说的可重复读呢?应该是仅指在 SELECT 的时候,因为 MySQL 事务的可重复读隔离级别下,会使用 MVCC,此时 SELECT 只查找版本早于当前事务版本的数据行,所以保证了一次事务内的可重复读。而 INSERT、DELETE、UPDATE 均会使用当前系统版本号的最新数据。

可串行化

可串行化是最高的隔离级别。它通过强制事务串行执行,避免了前面说的幻读问题。简单来说,SERIALIZABLE 会在读取的每一行数据都加锁,所以可能导致大量的超时和锁争用问题。实际应用中也很少用到这个隔离级别。

Kubernetes开发知识–device-plugin的实现

什么是 device plugin

Kubernetes 作为一个自动化容器编排系统,在调度 pod 的时候会根据容器需要的资源进行节点的选择,节点的选择会分为预选和优选阶段。预选阶段会根据所有节点上剩余的资源量与 pod 需要的资源量进行对比,选出能够满足需求的节点。通常情况下,这里的资源都会包括 CPU 和 Memory,就像下面这样:

resources:
  requests:
    memory: "64Mi"
    cpu: "250m"
  limits:
    memory: "128Mi"
    cpu: "500m"

但是现在我们有了新的需求,我们有一个 tensorflow 的模型需要借助 tensorflow serving 来部署,同时我们希望采用 GPU 部署的方案以加快模型的在线推算速度。这样我们就需要一个 GPU 的资源并借助 Kubernetes 的调度器将容器调度到有空余 GPU 资源的方案。

在 1.11 版本之前的 Kubernetes 中,提供了 alpha.kubernetes.io/nvidia-gpu 的资源名称来帮助我们根据 GPU 资源调度。但是这也带来了一些问题:

  • Kubernetes 需要维护 NVIDIA GPU 相关的代码,增加了维护成本。
  • NVIDIA GPU 方面的专家不一定熟悉 Kubernetes,这不符合让最擅长的人做最擅长的事的原则。
  • 除了 NVIDIA GPU,还会有其他的计算资源需要支持。

因此,Kubernetes 在 1.8 版本引入了 device plugin 机制,将第三方的计算资源通过插件的方式引入 Kubernetes,并且由第三方厂商自行维护。Kubernetes 社区的活瞬间就轻松了,第三方厂商也开心了。

通过以上的说明,可以总结出 device plugin 主要用来解耦第三方计算资源和 kubernetes 系统,将第三方的计算资源通过插件的方式引入 Kubernetes。当然 cpu 和 memory 除外,毕竟谁还能少了 CPU 和 Memory 呢。

device plugin 能做什么

目前一些常用的 device plugin 有:

我了解的还有腾讯的 Gaia Scheduler,通过 device plugin 实现的 GPU 虚拟化方案。如果 NVIDIA 的 GPU 方案还不够适合你,可以看看腾讯的这个方案:tkestack/gpu-manager

我觉得腾讯的 GPU 虚拟化方案是最能说明 device plugin 使用场景的例子,通过定义 tencent.com/vcuda-coretencent.com/vcuda-memory 这两个计算资源,来将一个物理 GPU 划分成多个虚拟 GPU 进行调度,这样可以实现一个 GPU 上部署多个 tensorflow serving。你不需要购买特殊的硬件或者修改任何 Kubernetes 的代码,就有了 GPU 虚拟化的能力。

下面我们开个脑洞,现在我们有了一种叫做 cola 的计算资源,可以提高程序的 IO 能力。但是 cola 的资源有限,只分配给特定的容器使用。这时候我们就可以通过实现自己的 device plugin 来满足这个需求。我们的计算资源名就叫做: myway5.com/cola,在下面一小节做具体的实现。

device plugin 的实现方案

device plugin 的工作原理其实不复杂。主要有以下步骤:

  • 首先 device plugin 可以通过手动或 daemonset 部署到需要的节点上。
  • 为了让 Kubernetes 发现 device plugin,需要向 kubelet 的 unix socket。 进行注册,注册的信息包括 device plugin 的 unix socket,API Version,ResourceName。
  • kubelet 通过 grpc 向 device plugin 调用 ListAndWatch, 获取当前节点上的资源。
  • kubelet 向 api server 更新节点状态来通知资源变更。
  • 用户创建 pod,请求资源并调度到节点上后,kubelet 调用 device plugin 的 Allocate 进行资源分配。

时序图如下:

device plugins

在 device plugin 的实现中,最关键的两个要实现的方法是 ListAndWatchAllocate。除此之外,还要注意监控 kubelet 的重启,一般是使用 fsnotify 类似的库监控 kubelet.sock 的重新创建事件。如果重新创建了,则认为 kubelet 是重启了,我们需要重新向 kubelet 注册 device plugin。

ListAndWatch

我们上面定义的 myway5.com/cola 资源用 /etc/colas 下的文件代表。每一个文件代表一个可用的资源。因此实现 ListAndWatch 就是查找该文件夹下的文件,然后添加到设备列表发送给 kubelet,之后调用 fsnotify 去监控文件的 CREATEREMOVE 事件。每次设备列表发生变更都重新向 kubelet 发送更新过的设备列表。

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
func (s *ColaServer) ListAndWatch(e *pluginapi.Empty, srv pluginapi.DevicePlugin_ListAndWatchServer) error {
    log.Infoln("ListAndWatch called")
    devs := make([]*pluginapi.Device, len(s.devices))

    i := 0
    for _, dev := range s.devices {
        devs[i] = dev
        i++
    }

    err := srv.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
    if err != nil {
        log.Errorf("ListAndWatch send device error: %v", err)
        return err
    }

    // 更新 device list
    for {
        log.Infoln("waiting for device change")
        select {
        case <-s.notify:
            log.Infoln("开始更新device list, 设备数:", len(s.devices))
            devs := make([]*pluginapi.Device, len(s.devices))

            i := 0
            for _, dev := range s.devices {
                devs[i] = dev
                i++
            }

            srv.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
        }
    }
}

Allocate

在用户创建的 Pod 请求资源时,Kubernetes 的调度器会进行调度,并通过 kubelet 向 device plugin 发出 Allocate 调用,这一步的调用主要是为了让 device plugin 为容器调度资源。 在调度成功后向 kubelet 返回调度结果即可。

// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
func (s *ColaServer) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    log.Infoln("Allocate called")
    resps := &pluginapi.AllocateResponse{}
    for _, req := range reqs.ContainerRequests {
        log.Infof("received request: %v", strings.Join(req.DevicesIDs, ","))
        resp := pluginapi.ContainerAllocateResponse{
            Envs: map[string]string{
                "COLA_DEVICES": strings.Join(req.DevicesIDs, ","),
            },
        }
        resps.ContainerResponses = append(resps.ContainerResponses, &resp)
    }
    return resps, nil
}

部署

device plugin 可以手动部署到机器上,也可以通过 Daemonset 进行部署。这里当然是 Daemonset 进行部署了。部署的时候有几个注意事项:

  • 需要挂载 hostPath,其中 /var/lib/kubelet/device-plugins 是必须的。这个文件夹下有 kubelet.sock,以及我们也需要将 device plugin 的 unix socket 文件存在这里。使得 kubelet 可以和我们的应用通信。
  • 为 device plugin 的 Pod 设置调度优先级别,通常设置成 priorityClassName: "system-node-critical"。这样可以保证不会因为节点利用率过高被逐出。
  • 如果资源设备不是每台机器都有,建议使用 nodeSelector 将 device plugin 调度到指定的机器上。

device plugin 的开发源代码可以参考上面的 cola 例子:cola device plugin

部署结束之后,可以查看一下节点的资源情况:

$ kubectl describe nodes test
Capacity:
 cpu:                2
 ephemeral-storage:  17784752Ki
 hugepages-2Mi:      0
 memory:             1986740Ki
 myway5.com/cola:    2
 pods:               110
Allocatable:
 cpu:                2
 ephemeral-storage:  17784752Ki
 hugepages-2Mi:      0
 memory:             1986740Ki
 myway5.com/cola:    2
 pods:               110

创建一个 pod,请求 myway5.com/cola 资源:

$ kubectl apply -f e2e/pod-with-cola.yaml

然后查看一下 cola pod 的日志来了解设备发现和调度情况:

$ kubectl -n kube-system logs cola-thtm9 
time="2020-03-24T08:16:53Z" level=info msg="cola device plugin starting"
time="2020-03-24T08:16:53Z" level=info msg="find device 'cocacola'"
time="2020-03-24T08:16:53Z" level=info msg="find device 'peisicola'"
time="2020-03-24T08:16:53Z" level=info msg="watching devices"
time="2020-03-24T08:16:53Z" level=info msg="start GPPC server for 'myway5.com/cola'"
time="2020-03-24T08:16:53Z" level=info msg="Register to kubelet with endpoint cola.sock"
time="2020-03-24T08:16:53Z" level=info msg="register to kubelet successfully"
time="2020-03-24T08:16:53Z" level=info msg="ListAndWatch called"
time="2020-03-24T08:16:53Z" level=info msg="waiting for device change"
time="2020-03-24T08:17:10Z" level=info msg="Allocate called"

Kubernetes 开发知识–Kubernetes 准入控制与 admission webhook 的使用

一、什么是准入控制

我们都知道 Kubernetes 的最核心组件就是它的 API Server,所有资源的创建、更新和删除都是通过 API Server 进行的。我们可以通过 HTTP 请求或者 kubectl 这样的客户端来和 APIServer 通信,在我们操作对象的请求到达 API Server 之前,会先到达准入控制器这里。准入控制器可以执行“验证”和“变更”操作。因此我们可以认为有两种准入控制器:变更(mutating)准入控制器验证(validating)准入控制器

准入控制过程也同样分为两个阶段。第一阶段,运行变更准入控制器,对对象进行修改操作;第二阶段,运行验证准入控制器。如果任何一个阶段的任何控制器拒绝了该请求,则整个请求将立即被拒绝,并向终端用户返回一个错误。

我们可以通过下面的图来直观的感受一下:

Kubernetes 开发知识--Kubernetes 准入控制与 admission webhook 的使用

二、准入控制能做什么

准入控制存在的目的就是提高 Kubernetes 架构的灵活性。因此 Kubernetes 提供了一些准入控制器,并且允许你打开或关闭一部分,同时你也可以自定义准入控制器,来完成你自己的一些特殊需求。可以自定义的准入控制器也分为两种:一个叫 MutatingAdmissionWebhook,一个叫 ValidatingAdmissionWebhook,其实也是我们上面说的变更准入控制器验证准入控制器

Kubernetes 提供了一些常用的准入控制器,下面举几个例子:

  • AlwaysPullImages: 该准入控制器会修改每一个新创建的 Pod 的镜像拉取策略为 Always。这样在多租户集群里,用户就能保证自己的私有镜像只会在有凭证的情况下使用,而不会出现因为机器上缓存了镜像而被其他用户直接使用的情况。

  • NamespaceLifecycle:该准入控制器禁止在一个正在被终止的 Namespace 中创建新对象,并且确保使用不存在的 Namespace 的请求被拒绝。该准入控制器还会禁止删除三个系统保留的命名空间,即 defaultkube-systemkube-public

除了 Kubernetes 提供的这些准入控制器,我们可以编写 MutatingAdmissionWebhookValidatingAdmissionWebhook。比如我之前有写一个叫做 [lazykube](https://github.com/joyme123/lazykube)MutatingAdmissionWebhook,它的作用是对每一个创建的 Pod 的镜像源都进行校验,如果是 docker hub, gcr.io, quay.io 等地址的镜像,就修改成国内的代理源,这样就可以自动解决镜像需要翻墙下载的问题了。

三、如何使用 admission webhook 编写一个准入控制器

使用 admission webhook 编写一个准入控制器其实很简单,它就和你日常写 web server 一样:

// Start 启动服务
func (whsrv *WebhookServer) Start() error {
    mux := http.NewServeMux()
    mux.HandleFunc("/mutate", whsrv.serve)
    whsrv.server.Handler = mux

    if err := whsrv.server.ListenAndServeTLS("", ""); err != nil {
        return fmt.Errorf("Failed to listen and serve webhook server: %v", err)
    }

    return nil
}

在收到一个 AdmissionReview 请求的时候,将其中的 AdmissionRequest 对象携带的资源对象取出来进行校验或变更:

var body []byte
if r.Body != nil {
    if data, err := ioutil.ReadAll(r.Body); err == nil {
        body = data
    }
}

if len(body) == 0 {
    log.Error("empty body")
    http.Error(w, "empty body", http.StatusBadRequest)
    return
}

// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
    log.Errorf("Content-Type=%s, expect application/json", contentType)
    http.Error(w, "invalid Content-Type, expect `application/json`", http.StatusUnsupportedMediaType)
    return
}

var admissionResponse *v1beta1.AdmissionResponse
ar := v1beta1.AdmissionReview{}
if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
    log.Errorf("Can't decode body: %v", err)
    admissionResponse = &v1beta1.AdmissionResponse{
        Result: &metav1.Status{
            Message: err.Error(),
        },
    }
} else {
    admissionResponse = whsrv.mutate(&ar)
}

然后生成 AdmissionResponse, 通过 AdmissionReview 进行响应即可:

admissionReview := v1beta1.AdmissionReview{}
if admissionResponse != nil {
    admissionReview.Response = admissionResponse
    if ar.Request != nil {
        admissionReview.Response.UID = ar.Request.UID
    }
}

resp, err := json.Marshal(admissionReview)
if err != nil {
    log.Errorf("Can't encode response: %v", err)
    http.Error(w, fmt.Sprintf("could not encode response: %v", err), http.StatusInternalServerError)
}
log.Infoln("Ready to write response ...")
if _, err := w.Write(resp); err != nil {
    log.Errorf("Can't write response: %v", err)
    http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}

完整的代码可以参考上面的 lazykube 项目。

这样我们写好之后就可以开始部署了。我们需要告诉 Kubernetes 我们的 admission webhook 的地址,以及我们要操作的资源对象,因此我们创建一个 MutatingWebhookConfiguration 或者 ValidatingWebhookConfiguration 对象,比如 lazykube 就是对创建 Pod 进行变更操作:

apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
  name: lazykube-webhook-cfg
  namespace: kube-system
  labels:
    app: lazykube
webhooks:
  - name: lazykube.myway5.com
    clientConfig:
      service:
        name: lazykube-webhook-svc
        namespace: kube-system
        path: "/mutate"
      caBundle: ××××××××
    rules:
      - operations: [ "CREATE" ]
        apiGroups: [""]
        apiVersions: ["v1"]
        resources: ["pods"]

然后为了让 Kubernetes 通过 HTTPS 进行通信,并信任我们的证书,我们需要让 Kubernetes 集群为我们的签发证书。具体的方法可以参考这里:kubernetes 集群中的证书签发。最后使用 Deployment 部署我们的服务即可。

kubernetes 集群中的证书签发

场景

在我们使用 kubernetes 的 admission webhook 机制实现一些集群资源认证、修改的方案时,会涉及到集群内部的 https 通信。这就涉及到我们的服务需要配置证书,并且要让 kubernetes 的组件信任该证书。我们都知道 kubernetes 集群中所有的证书都是由一个自定义的 CA 签发的,并且 kubernetes 集群都信任该 CA,因此基于该原理,使用 kubernetes 提供的 CertificateSigningRequest 来为我们的证书签名即可。

具体流程

kubernetes 官方的文档上提供了比较详细的说明,文档地址在这里:管理集群中的 TLS 认证

大致流程如下:

  • 使用 cfssl 为我们的 service 地址创建证书
  • 使用 CertificateSigningRequest 请求 kubernetes 的 CA 来为该证书签名
  • 管理员通过 kubectl certificate approve 来批准请求
  • 通过 kubectl get csr 获取签名后的证书,并使用到我们的服务中。

当然这个过程还不够自动化,我们可以使用一些很好的脚本来帮助我们完成这个工作:

这个脚本使用 openssl 来创建公私钥,创建 CertificateSigningRequest 请求对公钥签名,然后自动批准并将私钥和签名后的公钥存到 secret 中。使用方式如下:

./webhook-create-signed-cert.sh \
    --service lazykube-webhook-svc \
    --secret lazykube-webhook-certs \
    --namespace kube-system

这个脚本其实跟证书签发没有太大关系,但是如果你使用 mutatingwebhook 的话正好可以使用它,同理 validatingwebhook 也是如此

cat mutatingwebhook.yaml | \
    ./webhook-patch-ca-bundle.sh > \
    mutatingwebhook-ca-bundle.yaml

一些问题

我在使用上述脚本的时候,发现使用 rancher 创建的 kubernetes 集群有问题。这是因为 rancher 创建的 kubernetes 集群没有默认开启 kubernetes controller manager 的签名选项。具体的选项可以参考这里:给集群管理员的一个建议

修改的方案就是打开该选项,rancher 中可以在界面上编辑集群的 yaml 文件,加上以下参数:

services:
  kube-controller: 
    extra_args: 
      cluster-signing-cert-file: "/etc/kubernetes/ssl/kube-ca.pem"
      cluster-signing-key-file: "/etc/kubernetes/ssl/kube-ca-key.pem"

Golang 中的错误处理建议

一、概述

Golang 的错误处理一直是一个比较讨论比较多的话。我刚接触 Golang 的时候也看过关于错误处理的一些文档,但是并没有放在心上。在我使用 Golang 一段时间之后,我觉得我可能无法忽略这个问题。因此,这篇文章主要是为了整理一些在 Golang 中常用的错误处理技巧和原则。

二、错误处理的技巧和原则

2.1 使用封装来避免重复的错误判断

在 Golang 的项目中,最多的一句代码肯定是 if err != nil。Golang 将错误作为返回值,因此你不得不处理这些错误。但是有的时候,错误处理的判断可能会占据你的代码的一半篇幅,这使得代码看起来乱糟糟的。在官方的博客中有一个这样的例子:

_, err = fd.Write(p0[a:b])
if err != nil {
    return err
}
_, err = fd.Write(p1[c:d])
if err != nil {
    return err
}
_, err = fd.Write(p2[e:f])
if err != nil {
    return err
}
// and so on

是的,你没有看错,这里其实就是调用了 3 行 fd.Write,但是你不得不写上 9 错误判断。因此官方的博客中也给出了一个比较优雅的处理方案:将 io.Writer 再封装一层。

type errWriter struct {
    w   io.Writer
    err error
}

func (ew *errWriter) write(buf []byte) {
    if ew.err != nil {
        return
    }
    _, ew.err = ew.w.Write(buf)
}

ew := &errWriter{w: fd}
ew.write(p0[a:b])
ew.write(p1[c:d])
ew.write(p2[e:f])
// and so on
if ew.err != nil {
    return ew.err
}

现在看上去就好多了,write(buf []byte) 方法在内部判断了错误值,来避免在外面多次的错误判断。当然,这种写法可能也有它的弊端,比如你没有办法知道出错在哪一行调用。大多数情况下,你只需要检查错误,然后进行处理而已。因此这种技巧还是很有用的。Golang 的标准库也有很多类似的技巧。比如

b := bufio.NewWriter(fd)
b.Write(p0[a:b])
b.Write(p1[c:d])
b.Write(p2[e:f])
// and so on
if b.Flush() != nil {
    return b.Flush()
}

其中 b.Write 是有错误值返回的,这只是为了符合 io.Writer 接口。你可以在调用 b.Flush() 的时候再进行错误值的判断。

2.2 Golang 1.13 前的错误处理

检验错误

大多数情况下,我们只需要对错误进行简单的判断即可。因为我们不需要对错误做其他的处理,只需要保证代码逻辑正确执行即可。

if err != nil {
    // something went wrong
}

但有的时候我们需要根据错误类型进行不同的处理,比如网络连接未连接/断开导致的错误,我们应该在判断是未连接/断开时,进行重连操作。 在涉及到错误类型的判断时,我们通常有两种方法

  1. 将错误和已知的值进行比对
var ErrNotFound = errors.New("not found")

if err == ErrNotFound {
    // something wasn't found
}
  1. 判断错误的具体类型
type NotFoundError struct {
    Name string
}

func (e *NotFoundError) Error() string { return e.Name + ": not found" }

if e, ok := err.(*NotFoundError); ok {
    // e.Name wasn't found
}

添加信息

当一个错误在经过多层的调用栈向上返回时,我们通常会在这个错误上添加一些额外的信息,以帮助开发人员判断错误出现时程序运行到了哪里,发生了什么。最简单的方式是,使用之前的错误信息构造新的错误:

if err != nil {
    return fmt.Errorf("decompress %v: %v", name, err)
}

使用 fmt.Errorf 只保留了上一个错误的文本,丢弃了其他所有的信息。如果我们想保留上一个错误的所有信息,我们可以使用下面的方式:

type QueryError struct {
    Query string
    Err   error
}

if e, ok := err.(*QueryError); ok && e.Err == ErrPermission {
    // query failed because of a permission problem
}

2.3 Golang 1.13 中的错误处理

Golang 1.13 中,如果一个错误包含了另一个错误,则可以通过实现 Unwrap() 方法来返回底层的错误。如果 e1.Unwrap() 返回了 e2,我们就可以说 e1 包含了 e2。

使用 Is 和 As 来检验错误

在 2.2 中提到了错误信息的常见处理方式,在 Golang 1.13 中,标准库中添加了几个方法来帮助我们更快速的完成以上的工作。当前前提是,你的自定义 Error 正确的实现了 Unwrap() 方法

errors.Is 用来将一个错误和一个值进行对比:

// Similar to:
//   if err == ErrNotFound { … }
if errors.Is(err, ErrNotFound) {
    // something wasn't found
}

errors.As 用来判断一个错误是否是一个特定的类型:

// Similar to:
//   if e, ok := err.(*QueryError); ok { … }
var e *QueryError
if errors.As(err, &e) {
    // err is a *QueryError, and e is set to the error's value
}

当操作一个包装了的错误时,IsAs 会考虑错误链上所有的错误。一个完整的例子如下:

type ErrorA struct {
    Msg string
}

func (e *ErrorA) Error() string {
    return e.Msg
}

type ErrorB struct {
    Msg string
    Err *ErrorA
}

func (e *ErrorB) Error() string {
    return e.Msg + e.Err.Msg
}

func (e *ErrorB) Unwrap() error {
    return e.Err
}

func main() {
    a := &ErrorA{"error a"}

    b := &ErrorB{"error b", a}

    if errors.Is(b, a) {
        log.Println("error b is a")
    }

    var tmpa *ErrorA
    if errors.As(b, &tmpa) {
        log.Println("error b as ErrorA")
    }
}

输出如下:

error b is a
error b as ErrorA

使用 %w 来包装错误

Go 1.13 中增加了 %w,当 %w 出现时,由 fmt.Errorf 返回的错误,将会有 Unwrap 方法,返回的是 %w 对应的值。下面是一个简单的例子:

type ErrorA struct {
    Msg string
}

func (e *ErrorA) Error() string {
    return e.Msg
}

func main() {
    a := &ErrorA{"error a"}

    b := fmt.Errorf("new error: %w", a)

    if errors.Is(b, a) {
        fmt.Println("error b is a")
    }

    var tmpa *ErrorA
    if errors.As(b, &tmpa) {
        fmt.Println("error b as ErrorA")
    }
}

输出如下:

error b is a
error b as ErrorA

是否需要对错误进行包装

当你向一个 error 中添加额外的上下文信息时,要么使用 fmt.Errorf,要么实现一个自定义的错误类型,这是你就要决定这个新的错误是否应该包装原始的错误信息。这是一个没有标准答案的问题,它取决于新错误创建的上下文。

包装一个错误是为了将它暴露给调用者。这样调用者就可以根据不同的原始错误作出不同的处理,比如 os.Open(file) 会返回文件不存在这种具体的错误, 这样调用者就可以通过创建文件来让代码可以正确往下执行。

当我们不想暴露实现细节时就不要包装错误。因为暴露一个具备细节的错误,就意味的调用者和我们的代码产生了耦合。这也违反了抽象的原则。

三、参考资料

go 调度器的实现

一、概述

goroutine 是 go 语言的特色之一,在 go 语言中,你可以轻易的使用 go 关键字来创建一个协程运行一段代码,协程在使用上和我们常说的线程相似,但是在 go 中,协程的实现并非是直接使用线程。原因有二:

  • 线程的实现由操作系统决定,它附带了太多额外的特性,比如线程有它自己的信号掩码,线程能够被赋予 CPU affinity 功能,线程能够被添加到 Cgroup 中,线程所使用的资源也可以被查询到,线程的栈空间大小默认是 2M 等等。这些在 goroutine 中不需要的特性带来了额外的性能开销。

  • 除了上面的性能问题,在 go 语言的编程模型中,操作系统无法作出最佳的决策。比如在运行一次垃圾收集时,go 的垃圾收集器要求所有线程都被停止,并且内存处于一致性状态(关于内存一致性,可以参考这篇文章:内存一致性模型)。这个涉及到要等待全部运行时线程(running threads)到达一个点(point),我们事先知道在这个点内存是一致的。当在一个随机点上调度了多个线程,你不得不等待他们中的大多数到达一致性状态。go 调度程序可以决定仅在知道内存一致的点上进行调度,这样在任何时刻,当前没有被调度的线程是处于内存一致状态的,这意味着当我们要为垃圾收集停止线程运行时,我们只需要等待那些 CPU 核心上运行的线程。

因此,go 语言实现一个自己的调度器,它可以创建更轻量的线程,也就是 goroutine,同时,调度算法需要更智能,以提升大量并发下的性能。

在讨论 go 的调度器之前,我们还要讨论一下常见的线程模型,具体的内容可以参考这篇文章:内核线程与用户线程的一点小总结.

我们根据线程的调度实现,将线程分为内核线程和用户线程。其中,内核线程是由操作系统调度,而用户线程是由用户自己实现的调度器进行调度。根据用户线程和内核线程的关系分为下面几种线程模型:

1:1模型

1:1 模型很简单,就是将一个用户线程映射或绑定到一个内核线程上,但是这会导致线程的上下文切换会很慢。

N:1模型

N:1 模型中会将多个用户线程映射或绑定到一个内核线程上,这样多个用户线程之间的上下文切换会很快,但是缺点是没法利用多核的优势。

M:N模型

1:1 模型 和 N:1 模型都有它们各自的缺点,因此 go 调度器中使用了 M:N 模型,既能保证利用到多核的优势,也能保证更快的上下文切换。这也是我们接下来要讨论的问题。

二、go 1.0 的调度器实现方案

go 1.0 的调度器实现方案其实并不是这篇文章的关注点,因为它的生命周期太短。之所以要拿出来单独说,是为了更好的理解 go 1.1 中调度器实现方案解决的问题以及带来的性能提升。

为了更好的阐述这中间的机制,我们使用符号 M 来表示内核线程,使用符号 G 来表示 goroutine。在这个版本中,只有一个全局的 goroutine 队列,所有的内核线程都要从这个队列中取出 和放回goroutine。下图是一个包含两个内核线程,只有一个全局队列的例子。

go scheduler 1.0

只有一个全局队列,导致无法保证一个 goroutine 会在同一个内核线程上调度。因此,一个 goroutine 会在不同的内核线程上调度,导致上下文切换较慢,非常影响性能。下面是一个阻塞通道的例子,用来说明这个问题:

goroutine G7 阻塞在 channel 上,等待接收一个消息。一旦接收到这个消息,G7 就被放到全局队列中。

go scheduler 1.0

G7 被放到队列尾后,队列首的 GX 得到了被执行的机会,因此它被调度上第一个 M 上执行。此时,G8 也阻塞在 channel 上。

go scheduler 1.0

G7 重新得到了执行的机会,但是因为第一个 M 正在执行 GX,因此 G7 只能调度到第二个 M 上执行。这时候就发生了跨内核线程的上下文切换。

go scheduler 1.0

只有一个全局队列还带来了另外一个问题,因为从队列中获取 goroutine 必须要加锁,导致锁的争用非常频繁。尤其是在大量 goroutine 被调度的情况下,对性能的影响也会非常明显。

另外在 Scalable Go Scheduler Design Doc 这篇文章中还提到了另外两个问题。

  • 所有的 M 都关联了内存缓存(mcache)和其他的缓存(栈空间),但实际上只有正在运行的 go 代码的 M 才需要 mcache(阻塞在系统调用的 M 不需要 mcache)。运行 go 代码的 M 和系统调用阻塞的 M 比例大概在 1:100,这就导致了大量的资源消耗(每个 mcache 会占用到 2M)以及 poor data locality(poor data locality找不到好的翻译,意思大概是内存缓存命中会很少,导致内存缓存无效,可参考这里:What does it mean that hash sets have poor data locality?

  • 激进的线程阻塞/解阻塞。因为系统调用导致工作线程经常被阻塞和解阻塞,这增加了很多的负担。

上面就是4个关于 go 1.0 中调度器的问题所在。Dmitry Vyukov 因此提出了新的调度算法,并在 go 1.1 中发布。

三、go 1.1 之后的调度器实现方案

针对在 1.0 中调度器实现的问题,go 1.1 在 M, G 的基础上,引入了新的角色 P(processor)。以下简单列出新的调度算法的改进点:

  • 新角色 P 代替了 M 的一部分功能,M 的 mcache 现在属于 P 了,并且 P 的数量等于 GOMAXPROCS。M 要执行 G 的时候,就被调度绑定一个 P,然后去执行 G,这样对内存的占用就大大减少。
  • 每个 P 都有自己的 goroutine 队列 runq,新的 G 就放到自己的 runq 上,满了之后再放到全局的 runq,优先执行自己的 runq。这样的设计大大减少了锁的争用,并且可以保证尽量少的在多个核心上传递 G。
  • 当 G 执行网络操作和锁切换时,G 和 M 分离,M 通过调度执行新的 G。这样就可以保证用户在 G 中执行网络操作时不用考虑阻塞线程的问题。
  • 当 M 因为执行系统调用阻塞或 cgo 运行一段时间后,sysmon 协程会将 P 和 M分离,由其他的 M 来结合 P 进行调度。这样 M 就不用因为阻塞而占用不必要的资源。

下面是一个比较易懂的图例:

cast

M 是内核线程,P 是调度的上下文,G 是 goroutine。

in-motion

这里可以看到。M 绑定一个 P 来执行 G,灰色部分代表待执行的、只属于 P 的 goroutine 队列。

syscall

M0 线程因为执行 syscall 导致阻塞,因此调度算法将其和 P 分离,防止其占用 P 的资源,然后将 P 分给 M1 来执行剩下的 goroutine。

steal

第二个 P 中的 goroutine 已经执行结束。因此从第一个 P 中“偷来”一部分的 goroutine 执行。

下面是一个比较详细的 GPM 模型示意图:

gpm

四、附加的参考资料

下面的一些图片是从腾讯技术团队分享的 PPT 中截取的,PPT 地址:深入浅出Golang Runtime

go-runtime

五、参考文档

mock 测试和 gomock 的使用

mock 测试是什么

在平常做单元测试中,常常会依赖外部的系统,这导致单元测试很难写。比如业务系统中有一个用户信息更新的函数 UpdateUserInfo,如果对该函数做单元测试,则需要连接数据库,建立测试所需的基础数据,然后执行测试,最后清除测试导致的数据更新。这导致单元测试的成本很高,并且难以维护。

这时候,mock 测试就可以发挥它的作用了。我们将对数据库的操作做成假的,也就是 mock 出一个假的数据库操作对象,然后注入到我们的业务逻辑中使用,然后就可以对业务逻辑进行测试。

看了描述可能还是有点糊涂,下面会用一个例子来说明

一个 mock 测试的例子

这个例子是一个简单的用户登录,其中,UserDBI 是用户表操作的接口,其实现是UserDB,我们的业务层有 UserService,实现了 Login 方法,我们现在要做的就是对 Login 这里的业务逻辑进行单元测试。项目结构如下:

.
├── db
│   └── userdb.go
├── go.mod
├── go.sum
├── mocks
└── service
    ├── user.go
    └── user_test.go

UserDBI 的代码如下:

type UserDBI interface {
    Get(name string, password string) (*User, error)
}

UserDB 的相关代码如下:


type UserDB struct { db *sql.DB } func NewUserDB(user string, password string, host string, port int, db string) (UserDBI, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", user, password, host, port, db) var userDB UserDB var err error userDB.db, err = sql.Open("mysql", dsn) if err != nil { return nil, err } return &userDB, nil } // Get 根据 UserID 获取用户资料 func (udb *UserDB) Get(name string, password string) (*User, error) { s := "SELECT * FROM user WHERE name = ? AND password = ?" stmt, err := udb.db.Prepare(s) if err != nil { return nil, err } defer stmt.Close() var user User err = stmt.QueryRow(name, password).Scan(&user) if err != nil { return nil, err } return &user, nil }

Login 的逻辑如下:


type UserService struct { db db.UserDBI } // NewUserService 实例化用户服务 func NewUserService(db db.UserDBI) *UserService { var userService UserService userService.db = db return &userService } // Login 登录 func (userService *UserService) Login(name, password string) (*db.User, error) { user, err := userService.db.Get(name, password) if err != nil { log.Println(err) return nil, err } return user, nil }

可以知道,通过 NewUserService 可以实例化出 UserService 对象,然后调用 Login 即可实现登录逻辑,但是在 Login 中调用了 UserDB 的 Get 方法,而 Get 方法又会从实际的数据库中去查询。这就是我们这个例子的测试难点:有没有办法不依赖实际的数据库去完成单元测试呢?

这里我们的 NewUserService 的参数是 UserDBI 这个接口,在实际的代码运行中,我们是将 UserDB 的实例化对象传进去的,但是在测试的时候,我们完全可以传入一个不操作数据库的假的对象,这个对象只需要实现了 UserDBI 的接口即可。因此我们创建了一个 FakeUserDB,这个 FakeUserDB 就是我们 mock 出来的内容了。这个 FakeUserDB 非常简单,因为它什么也不包含。

type FakeUserDB struct {
}

然后,这个 FakeUserDB 实现了 Get 方法,如下:

func (db *FakeUserDB) Get(name string, password string) (*User, error) {
    if name == "user" && password == "123456" {
        return &User{ID: 1, Name: "user", Password: "123456", Age: 20, Gender: "male"}, nil
    } else {
        return nil, errors.New("no such user")
    }
}

这里的 Get 方法中既可以返回正常情况,又可以返回错误的情况,完全满足我们的测试需求。这样,我们就完成 mock 测试的一大半内容了,接下来我们来实际写单元测试即可。

func TestUserLoginWithFakeDB(t *testing.T) {

    testcases := []struct {
        Name        string
        Password    string
        ExpectUser  *db.User
        ExpectError bool
    }{
        {"user", "123456", &db.User{1, "user", "123456", 20, "male"}, false},
        {"user2", "123456", nil, true},
    }

    var fakeUserDB db.FakeUserDB
    userService := NewUserService(&fakeUserDB)
    for i, testcase := range testcases {

        user, err := userService.Login(testcase.Name, testcase.Password)

        if testcase.ExpectError {
            assert.Error(t, err, "login error:", i)
        } else {
            assert.NoError(t, err, "login error:", i)
        }

        assert.Equal(t, testcase.ExpectUser, user, "user doesn't equal")
    }
}

执行单元测试:

$ go test github.com/joyme123/gomock-examples/service
ok      github.com/joyme123/gomock-examples/service     0.002s

可以看出,我们在测试时使用了 FakeUserDB,这样就彻底摆脱了数据库,并且这里的单元测试考虑了登录成功和登录失败的方式。

但是手写 FakeUserDB 同样也有点工作量,这个例子为了简洁所以体现不出来。考虑当 UserDBI 这个接口的方法很多的时候,我们需要额外手写的代码量立马就多了起来。还好 go 官方就提供了 gomock 这个工具,来帮我们更好的完成单元测试的工作。

gomock 的使用

gomock 的官方仓库地址是:https://github.com/golang/mock.git。gomock 并不复杂,其主要的工作是将我们刚刚的 FakeUserDB 由手动编写变成自动生成。因此我会用刚刚的例子加上 gomock 再做一遍示范。

gomock 的安装

执行以下命令即可安装:

GO111MODULE=on go get github.com/golang/mock/mockgen@latest

mockgen 会安装在你的 $GOPATH 下的 bin 目录中。

gomock 生成代码

在上面的例子中,我们用 FakeUserDB 实现了 UserDBI 这个接口,这里同样也是使用 mockgen 这个程序生成实现 UserDBI 的代码。

mkdir mocks
mockgen -package=mocks -destination=mocks/userdb_mock.go github.com/joyme123/gomock-examples/db UserDBI

在 mocks 下生成的文件如下:

// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/joyme123/gomock-examples/db (interfaces: UserDBI)

// Package mocks is a generated GoMock package.
package mocks

import (
    gomock "github.com/golang/mock/gomock"
    db "github.com/joyme123/gomock-examples/db"
    reflect "reflect"
)

// MockUserDBI is a mock of UserDBI interface
type MockUserDBI struct {
    ctrl     *gomock.Controller
    recorder *MockUserDBIMockRecorder
}

// MockUserDBIMockRecorder is the mock recorder for MockUserDBI
type MockUserDBIMockRecorder struct {
    mock *MockUserDBI
}

// NewMockUserDBI creates a new mock instance
func NewMockUserDBI(ctrl *gomock.Controller) *MockUserDBI {
    mock := &MockUserDBI{ctrl: ctrl}
    mock.recorder = &MockUserDBIMockRecorder{mock}
    return mock
}

// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockUserDBI) EXPECT() *MockUserDBIMockRecorder {
    return m.recorder
}

// Get mocks base method
func (m *MockUserDBI) Get(arg0, arg1 string) (*db.User, error) {
    m.ctrl.T.Helper()
    ret := m.ctrl.Call(m, "Get", arg0, arg1)
    ret0, _ := ret[0].(*db.User)
    ret1, _ := ret[1].(error)
    return ret0, ret1
}

// Get indicates an expected call of Get
func (mr *MockUserDBIMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
    mr.mock.ctrl.T.Helper()
    return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockUserDBI)(nil).Get), arg0, arg1)
}

执行测试

代码生成结束之后,我们开始写单元测试了。

func TestUserLoginWithGoMock(t *testing.T) {
    testcases := []struct {
        Name        string
        Password    string
        MockUser    *db.User
        MockErr     error
        ExpectUser  *db.User
        ExpectError bool
    }{
        {"user", "123456", &db.User{1, "user", "123456", 20, "male"}, nil, &db.User{1, "user", "123456", 20, "male"}, false},
        {"user2", "123456", nil, errors.New(""), nil, true},
    }

    ctrl := gomock.NewController(t)
    defer ctrl.Finish()

    userDB := mocks.NewMockUserDBI(ctrl)

    for i, testcase := range testcases {
        userDB.EXPECT().Get(testcase.Name, testcase.Password).Return(testcase.MockUser, testcase.MockErr)
        userService := NewUserService(userDB)
        user, err := userService.Login(testcase.Name, testcase.Password)

        if testcase.ExpectError {
            assert.Error(t, err, "login error:", i)
        } else {
            assert.NoError(t, err, "login error:", i)
        }

        assert.Equal(t, testcase.ExpectUser, user, "user doesn't equal")
    }
}

我们在测试用例中增加了两个字段:MockUser, MockErr,这就是我们 Mock 出来的数据,通过 userDB := mocks.NewMockUserDBI(ctrl) 实例化 mock 出来的 userDB,这里的 userDB 等价于上一个例子中的 fakeUserDB,然后调用 userDB.EXPECT().Get(testcase.Name, testcase.Password).Return(testcase.MockUser, testcase.MockErr) 这句话,来输入我们想输入的参数,产生我们想要的输出即可。这样在 Login 函数执行时会自动产生我们刚刚设定的 Mock 数据,完成单元测试的需求。

如果传参的时候,对参数不确定,可以使用 gomock.Any() 来代替,如果希望多次调用该方法仍然返回相同的结果,可以使用 .AnyTimes()

总结

mock 测试在实现上的重点是将外部依赖实现成可替换的,例子中使用了 UserDBI 这个接口来抽象出用户表的操作,然后使用参数的方式来实现 UserService 的实例化。接口和使用参数来实例化(也就是不要把外部依赖写死)缺一不可。只要注意到这一点就可以写出方便 mock 测试的代码。

容器中程序的信号捕捉

一、问题描述

项目中使用了 argo 在 kubernetes 集群中做工作流的调度。argo 提供了工作流的停止功能,其原理大致是检查正在运行的 Pod,向该 Pod 中的 wait 容器发送 USR2 信号,wait 容器收到 USR2 信号后,在主机上的调用 docker kill --signal TERM main_container_id 来停止我们的程序容器, 如果 10s 后容器还未停止,则发送 SIGKILL 来强制终止。但是我在实现 argo 工作流中调度 tfjob 时出现了一些问题。

argo_scheduler_tfjob

在argo停止工作流时,正在运行的 step2 中的 manager 监听了 TERM 信号,以便在工作流停止时同步停止 tfjob。但是事实情况却是 manager 退出了,但是没有收到任何的 TERM 信号。

二、问题剖析

检查这个问题的第一步是弄清楚 docker kill 背后发生了什么,官网的资料中有以下的描述:

Note: ENTRYPOINT and CMD in the shell form run as a subcommand of /bin/sh -c, which does not pass signals. This means that the executable is not the container’s PID 1 and does not receive Unix signals.

当我们用 sh 执行一段 shell script 时,在 shell script 中的可执行文件的 PID 不是1,并且 sh 也不会帮忙转发 TERM 信号,导致我们的可执行文件无法接收到终止信号,并执行清理逻辑。

我们的 manager 确实是用了一段 shell script 来启动的,可能就是因为这个原因导致无法收到 TERM 信号。

三、问题复现

我写了一段很简单的 go 程序,监听了 TERM 信号,然后打印一段文字。

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)

    s, ok := <-sigs
    if !ok {
        log.Println("信号接收出错")
        os.Exit(1)
    }

    log.Println("收到信号:", s.String())
}

我的 Dockerfile 如下:

FROM alpine:latest
LABEL maintainr="jiangpengfei <jiangpengfei12@gmail.com>"

COPY main /usr/bin/main
COPY run.sh /usr/bin/run.sh
RUN chmod +x /usr/bin/main && chmod +x /usr/bin/run.sh

CMD ["sh", "-c", "/usr/bin/run.sh"]

run.sh 如下:

#!/bin/sh
/usr/bin/main

执行这个容器后,查看容器内的进程:

PID   USER     TIME  COMMAND
    1 root      0:00 {busybox} ash /usr/bin/run.sh
    6 root      0:00 /usr/bin/main
   12 root      0:00 sh
   17 root      0:00 ps

可以发现,run.sh 是 PID 为1, main 程序是6。此时我们使用 docker kill --signal TERM main_container_id 来停止容器,发现确实是没有反应的。因为 TERM 信号会发送给 PID 为 1 的进程。同时也因为 sh 不响应 TERM 信号,也不会转发该信号给子进程,所以容器也不会退出。如果我们使用 docker stop 退出的话,会发现很慢,这是因为 docker stop 会尝试先用 TERM 信号来终止进程,一段时间后发现没有退出的话再使用 KILL 信号。

四、解决方案

这个问题的解决方案有很多,要么让我们的程序进程成为 PID 1,要么让 PID 为 1 的进程转发这个 TERM 信号给我们的子进程。

方法一: 在 shell script 中使用 exec

将我们的 run.sh 改成如下:

#!/bin/sh
exec /usr/bin/main

然后再查看容器内的进程列表:

PID   USER     TIME  COMMAND
    1 root      0:00 /usr/bin/main
   11 root      0:00 sh
   16 root      0:00 ps

可以发现,main 进程的PID 是 1, 我们使用 docker kill --signal TERM main_container_id 来杀死进程,出现如下打印语句:

2020/01/17 23:46:24 收到信号: terminated

可见,exec 可以让我们的 main 进程成为 PID 为 1, 关于 exec 的作用描述如下:

The exec() family of functions replaces the current process image with a new process image.

即使用新进程的镜像替换当前进程的镜像数据,可以理解为exec系统调用并没有创建新的进程,只是替换了原来进程上下文的内容。原进程的代码段,数据段,堆栈段被新的进程所代替。这样我们的 main 进程就顺利成章的替换了 sh 进程成为 PID 为 1 的进程了。

方法二: 直接使用 main 作为镜像入口

这是最简单的方法了,但是很多时候会有限制,因为我们希望在 shell script 中写一些逻辑来调用程序。

方法三: 借助第三方程序

一些第三方的程序专门提供了这样的作用,以它们作为启动的入口,这些第三方程序会 watch 所有它产生的子进程,在这些子进程退出后自动退出,并且在其收到 TERM 信号后发送给子进程。

这里我们用 smell-baron 这个应用作为例子

修改 Dockerfile:

FROM alpine:latest
LABEL maintainr="jiangpengfei <jiangpengfei12@gmail.com>"

COPY main /usr/bin/main
COPY run.sh /usr/bin/run.sh
RUN chmod +x /usr/bin/main && chmod +x /usr/bin/run.sh
RUN wget -O /usr/bin/smell-baron https://github.com/insidewhy/smell-baron/releases/download/v0.4.2/smell-baron.musl && chmod +x /usr/bin/smell-baron

CMD ["/usr/bin/smell-baron", "/usr/bin/run.sh"]

查看容器内的进程:

PID   USER     TIME  COMMAND
    1 root      0:00 /usr/bin/smell-baron /usr/bin/run.sh
    6 root      0:00 /usr/bin/main
   14 root      0:00 sh
   19 root      0:00 ps

使用 docker kill 发现 main 收到了 TERM 信号。

1.Multiple commands can be run, smell-baron will exit when all the watched processes have exited.

2.Whether a spawned process is watched can be configured.

3.smell-baron can be told to signal all child processes on termination, this allows it to cleanly deal with processes that spawn a subprocess in a different process group then fail to clean it up on exit.

kubernetes存储–FlexVolume

简介

kubernetes 使用 volume 来满足它的存储需求,它支持很多的存储系统,比如 nfs、 glusterfs、cephfs等等,但是这些存储的实现方式有一个问题,就是它们的实现代码都必须合并到 Kubernetes 的代码中(称为 in-tree),这为 kubernetes 社区带来了维护上的成本。因此,kubernetes 提出了两种 out-of-tree 的方案: FlexVolume 和 csi。通过这两种方案实现的存储功能不必合并到 kubernetes 的代码仓库,由存储系统的供应商单独维护。

FlexVolume 是这篇文章主要关注的点,FlexVolume 自 1.2 版本开始就支持了。它使用基于 exec 的模型来与驱动程序对接。用户必须在每个节点(有些情况下包括主节点)上的预定义卷插件路径中安装 FlexVolume 驱动程序的可执行文件。当需要挂载 volume 的时候,由 kubelet 执行挂载命令来挂载即可。

基于 nfs 实现 FlexVolume

在探究 FlexVolume 的实现原理之前,我们可以先看一下官方提供的基于 nfs 的例子

注: 我这里是用 minikube 启动的本地 kubernetes 集群。

为了部署基于 nfs 实现的 FlexVolume,我们首先将目录下的 nfs 复制到 deploy 文件夹下

$ cp nfs deploy

然后将 deploy/deploy.sh 中的 dummy 修改成 nfs,表示我们使用的插件脚本是 nfs 这个可执行文件。

接着在 deploy 文件夹下构建 docker 镜像,这里要修改 Dockerfile,将 nfs COPY 到镜像中。然后执行下面的命令(镜像标签需要修改成你自己的):

$ docker build -t joyme/nfs-flexvolume:1.0 .
$ docker push joyme/nfs-flexvolume:1.0

镜像构建并推送完成之后,我们就开始部署了。因为 FlexVolume 要求将驱动文件放在指定的目录下,最粗暴的方式就是手动将文件 scp 到集群的每个节点上。这里为了方便,我们还可以使用 kubernetes 的 Daemenset,然后使用 hostPath 将文件放到主机之上。我们修改 deploy 文件夹下的 ds.yaml 这个部署文件。将我们刚刚推送的镜像填进去。然后执行以下命令进行部署。

$ kubectl apply -f ds.yaml

这里有个地方要注意, 默认的插件安装地址是 /usr/libexec/kubernetes/kubelet-plugins/volume/exec/, 但是 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 都可以修改这个值,如果你启动这些组件是指定了这些参数,那就需要修改 ds.yaml 中的路径。

在集群中部署完成之后,我们可以到某个节点上检查一下/usr/libexec/kubernetes/kubelet-plugins/volume/exec/是否存在我们刚刚部署的文件。

最后我们创建一个 nginx,挂载一个 FlexVolume。在创建之前,我们需要先启动一个 nfs server,这里为了方便,可以使用容器启动一个。

$ docker run -d --privileged --restart=always \
-v /tmp:/dws_nas_scratch \
-e NFS_EXPORT_DIR_1=/dws_nas_scratch \
-e NFS_EXPORT_DOMAIN_1=\* \
-e NFS_EXPORT_OPTIONS_1=ro,insecure,no_subtree_check,no_root_squash,fsid=1 \
-p 111:111 -p 111:111/udp \
-p 2049:2049 -p 2049:2049/udp \
-p 32765:32765 -p 32765:32765/udp \
-p 32766:32766 -p 32766:32766/udp \
-p 32767:32767 -p 32767:32767/udp \
fuzzle/docker-nfs-server:latest

使用官方提供的 nginx-nfs.yaml 文件,然后把其中的 server 地址修改一下,使用以下命令创建:

$ kubectl apply -f nginx-nfs.yaml

注意:如果出现错误,可以检查 node 上是否安装了 jq, nfs-common 等必要的依赖包。

实现原理

在完成上面例子的过程中,关于 FlexVolume 的大多数问题都比较好解答了。我们来看一下 nfs 的实现代码:

usage() {
    err "Invalid usage. Usage: "
    err "\t$0 init"
    err "\t$0 mount <mount dir> <json params>"
    err "\t$0 unmount <mount dir>"
    exit 1
}

err() {
    echo -ne $* 1>&2
}

log() {
    echo -ne $* >&1
}

ismounted() {
    MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1`
    if [ "${MOUNT}" == "${MNTPATH}" ]; then
        echo "1"
    else
        echo "0"
    fi
}

domount() {
    MNTPATH=$1

    NFS_SERVER=$(echo $2 | jq -r '.server')
    SHARE=$(echo $2 | jq -r '.share')

    if [ $(ismounted) -eq 1 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    mkdir -p ${MNTPATH} &> /dev/null

    mount -t nfs ${NFS_SERVER}:/${SHARE} ${MNTPATH} &> /dev/null
    if [ $? -ne 0 ]; then
        err "{ \"status\": \"Failure\", \"message\": \"Failed to mount ${NFS_SERVER}:${SHARE} at ${MNTPATH}\"}"
        exit 1
    fi
    log '{"status": "Success"}'
    exit 0
}

unmount() {
    MNTPATH=$1
    if [ $(ismounted) -eq 0 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    umount ${MNTPATH} &> /dev/null
    if [ $? -ne 0 ]; then
        err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"
        exit 1
    fi

    log '{"status": "Success"}'
    exit 0
}

op=$1

if ! command -v jq >/dev/null 2>&1; then
    err "{ \"status\": \"Failure\", \"message\": \"'jq' binary not found. Please install jq package before using this driver\"}"
    exit 1
fi

if [ "$op" = "init" ]; then
    log '{"status": "Success", "capabilities": {"attach": false}}'
    exit 0
fi

if [ $# -lt 2 ]; then
    usage
fi

shift

case "$op" in
    mount)
        domount $*
        ;;
    unmount)
        unmount $*
        ;;
    *)
        log '{"status": "Not supported"}'
        exit 0
esac

exit 1

其实就是一段 shell 脚本,支持三个命令: init、mount、unmount。当我们在集群中为某个 pod 挂载 FlexVolume时,该 pod 所在节点的 kubelet 会调用其指定的插件脚本执行 mount 命令,然后挂载给 pod 使用。当然了,FlexVolume 还支持更复杂的插件。这个可以看官方的文档: flexvolume

部署方案

关于如何部署 FlexVolume 的插件,其实在例子中也有提到,这里可以总结一下:

  • 手动部署到每个节点的指定目录下,比如我们刚刚部署的 nfs ,其实际路径是: /usr/libexec/kubernetes/kubelet-plugins/volume/exec/k8s~nfs。其中 /usr/libexec/kubernetes/kubelet-plugins/volume/exec 是默认路径,也可以通过 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 来指定。k8s~nfs 这个路径中,k8s 是供应商, nfs 是驱动名称,在使用的时候可以这样指定: `driver: “k8s/nfs”。

  • 使用 kubernetes 的 deamonset 配合 hostPath 来部署,因为 daemonset 会在每个节点上都启动 pod,然后通过 hostPath 将插件放在指定的位置即可。kubernetes 集群中 master 节点可能被设置成不允许调度。这种情况下 daemonset 默认不调度到 master 节点上,可以使用 tolerations 来解决这个问题. 具体可参考: Scheduler is not scheduling Pod for DaemonSet in Master node

  • 其实除了 kubelet 要调用插件之外,controller-manager 也要调用。比如执行 init, attach, detach, waitforattach, isattached 等命令。