kubernetes 集群中的证书签发

场景

在我们使用 kubernetes 的 admission webhook 机制实现一些集群资源认证、修改的方案时,会涉及到集群内部的 https 通信。这就涉及到我们的服务需要配置证书,并且要让 kubernetes 的组件信任该证书。我们都知道 kubernetes 集群中所有的证书都是由一个自定义的 CA 签发的,并且 kubernetes 集群都信任该 CA,因此基于该原理,使用 kubernetes 提供的 CertificateSigningRequest 来为我们的证书签名即可。

具体流程

kubernetes 官方的文档上提供了比较详细的说明,文档地址在这里:管理集群中的 TLS 认证

大致流程如下:

  • 使用 cfssl 为我们的 service 地址创建证书
  • 使用 CertificateSigningRequest 请求 kubernetes 的 CA 来为该证书签名
  • 管理员通过 kubectl certificate approve 来批准请求
  • 通过 kubectl get csr 获取签名后的证书,并使用到我们的服务中。

当然这个过程还不够自动化,我们可以使用一些很好的脚本来帮助我们完成这个工作:

这个脚本使用 openssl 来创建公私钥,创建 CertificateSigningRequest 请求对公钥签名,然后自动批准并将私钥和签名后的公钥存到 secret 中。使用方式如下:

./webhook-create-signed-cert.sh \
    --service lazykube-webhook-svc \
    --secret lazykube-webhook-certs \
    --namespace kube-system

这个脚本其实跟证书签发没有太大关系,但是如果你使用 mutatingwebhook 的话正好可以使用它,同理 validatingwebhook 也是如此

cat mutatingwebhook.yaml | \
    ./webhook-patch-ca-bundle.sh > \
    mutatingwebhook-ca-bundle.yaml

一些问题

我在使用上述脚本的时候,发现使用 rancher 创建的 kubernetes 集群有问题。这是因为 rancher 创建的 kubernetes 集群没有默认开启 kubernetes controller manager 的签名选项。具体的选项可以参考这里:给集群管理员的一个建议

修改的方案就是打开该选项,rancher 中可以在界面上编辑集群的 yaml 文件,加上以下参数:

services:
  kube-controller: 
    extra_args: 
      cluster-signing-cert-file: "/etc/kubernetes/ssl/kube-ca.pem"
      cluster-signing-key-file: "/etc/kubernetes/ssl/kube-ca-key.pem"

Golang 中的错误处理建议

一、概述

Golang 的错误处理一直是一个比较讨论比较多的话。我刚接触 Golang 的时候也看过关于错误处理的一些文档,但是并没有放在心上。在我使用 Golang 一段时间之后,我觉得我可能无法忽略这个问题。因此,这篇文章主要是为了整理一些在 Golang 中常用的错误处理技巧和原则。

二、错误处理的技巧和原则

2.1 使用封装来避免重复的错误判断

在 Golang 的项目中,最多的一句代码肯定是 if err != nil。Golang 将错误作为返回值,因此你不得不处理这些错误。但是有的时候,错误处理的判断可能会占据你的代码的一半篇幅,这使得代码看起来乱糟糟的。在官方的博客中有一个这样的例子:

_, err = fd.Write(p0[a:b])
if err != nil {
    return err
}
_, err = fd.Write(p1[c:d])
if err != nil {
    return err
}
_, err = fd.Write(p2[e:f])
if err != nil {
    return err
}
// and so on

是的,你没有看错,这里其实就是调用了 3 行 fd.Write,但是你不得不写上 9 错误判断。因此官方的博客中也给出了一个比较优雅的处理方案:将 io.Writer 再封装一层。

type errWriter struct {
    w   io.Writer
    err error
}

func (ew *errWriter) write(buf []byte) {
    if ew.err != nil {
        return
    }
    _, ew.err = ew.w.Write(buf)
}

ew := &errWriter{w: fd}
ew.write(p0[a:b])
ew.write(p1[c:d])
ew.write(p2[e:f])
// and so on
if ew.err != nil {
    return ew.err
}

现在看上去就好多了,write(buf []byte) 方法在内部判断了错误值,来避免在外面多次的错误判断。当然,这种写法可能也有它的弊端,比如你没有办法知道出错在哪一行调用。大多数情况下,你只需要检查错误,然后进行处理而已。因此这种技巧还是很有用的。Golang 的标准库也有很多类似的技巧。比如

b := bufio.NewWriter(fd)
b.Write(p0[a:b])
b.Write(p1[c:d])
b.Write(p2[e:f])
// and so on
if b.Flush() != nil {
    return b.Flush()
}

其中 b.Write 是有错误值返回的,这只是为了符合 io.Writer 接口。你可以在调用 b.Flush() 的时候再进行错误值的判断。

2.2 Golang 1.13 前的错误处理

检验错误

大多数情况下,我们只需要对错误进行简单的判断即可。因为我们不需要对错误做其他的处理,只需要保证代码逻辑正确执行即可。

if err != nil {
    // something went wrong
}

但有的时候我们需要根据错误类型进行不同的处理,比如网络连接未连接/断开导致的错误,我们应该在判断是未连接/断开时,进行重连操作。 在涉及到错误类型的判断时,我们通常有两种方法

  1. 将错误和已知的值进行比对
var ErrNotFound = errors.New("not found")

if err == ErrNotFound {
    // something wasn't found
}
  1. 判断错误的具体类型
type NotFoundError struct {
    Name string
}

func (e *NotFoundError) Error() string { return e.Name + ": not found" }

if e, ok := err.(*NotFoundError); ok {
    // e.Name wasn't found
}

添加信息

当一个错误在经过多层的调用栈向上返回时,我们通常会在这个错误上添加一些额外的信息,以帮助开发人员判断错误出现时程序运行到了哪里,发生了什么。最简单的方式是,使用之前的错误信息构造新的错误:

if err != nil {
    return fmt.Errorf("decompress %v: %v", name, err)
}

使用 fmt.Errorf 只保留了上一个错误的文本,丢弃了其他所有的信息。如果我们想保留上一个错误的所有信息,我们可以使用下面的方式:

type QueryError struct {
    Query string
    Err   error
}

if e, ok := err.(*QueryError); ok && e.Err == ErrPermission {
    // query failed because of a permission problem
}

2.3 Golang 1.13 中的错误处理

Golang 1.13 中,如果一个错误包含了另一个错误,则可以通过实现 Unwrap() 方法来返回底层的错误。如果 e1.Unwrap() 返回了 e2,我们就可以说 e1 包含了 e2。

使用 Is 和 As 来检验错误

在 2.2 中提到了错误信息的常见处理方式,在 Golang 1.13 中,标准库中添加了几个方法来帮助我们更快速的完成以上的工作。当前前提是,你的自定义 Error 正确的实现了 Unwrap() 方法

errors.Is 用来将一个错误和一个值进行对比:

// Similar to:
//   if err == ErrNotFound { … }
if errors.Is(err, ErrNotFound) {
    // something wasn't found
}

errors.As 用来判断一个错误是否是一个特定的类型:

// Similar to:
//   if e, ok := err.(*QueryError); ok { … }
var e *QueryError
if errors.As(err, &e) {
    // err is a *QueryError, and e is set to the error's value
}

当操作一个包装了的错误时,IsAs 会考虑错误链上所有的错误。一个完整的例子如下:

type ErrorA struct {
    Msg string
}

func (e *ErrorA) Error() string {
    return e.Msg
}

type ErrorB struct {
    Msg string
    Err *ErrorA
}

func (e *ErrorB) Error() string {
    return e.Msg + e.Err.Msg
}

func (e *ErrorB) Unwrap() error {
    return e.Err
}

func main() {
    a := &ErrorA{"error a"}

    b := &ErrorB{"error b", a}

    if errors.Is(b, a) {
        log.Println("error b is a")
    }

    var tmpa *ErrorA
    if errors.As(b, &tmpa) {
        log.Println("error b as ErrorA")
    }
}

输出如下:

error b is a
error b as ErrorA

使用 %w 来包装错误

Go 1.13 中增加了 %w,当 %w 出现时,由 fmt.Errorf 返回的错误,将会有 Unwrap 方法,返回的是 %w 对应的值。下面是一个简单的例子:

type ErrorA struct {
    Msg string
}

func (e *ErrorA) Error() string {
    return e.Msg
}

func main() {
    a := &ErrorA{"error a"}

    b := fmt.Errorf("new error: %w", a)

    if errors.Is(b, a) {
        fmt.Println("error b is a")
    }

    var tmpa *ErrorA
    if errors.As(b, &tmpa) {
        fmt.Println("error b as ErrorA")
    }
}

输出如下:

error b is a
error b as ErrorA

是否需要对错误进行包装

当你向一个 error 中添加额外的上下文信息时,要么使用 fmt.Errorf,要么实现一个自定义的错误类型,这是你就要决定这个新的错误是否应该包装原始的错误信息。这是一个没有标准答案的问题,它取决于新错误创建的上下文。

包装一个错误是为了将它暴露给调用者。这样调用者就可以根据不同的原始错误作出不同的处理,比如 os.Open(file) 会返回文件不存在这种具体的错误, 这样调用者就可以通过创建文件来让代码可以正确往下执行。

当我们不想暴露实现细节时就不要包装错误。因为暴露一个具备细节的错误,就意味的调用者和我们的代码产生了耦合。这也违反了抽象的原则。

三、参考资料

go 调度器的实现

一、概述

goroutine 是 go 语言的特色之一,在 go 语言中,你可以轻易的使用 go 关键字来创建一个协程运行一段代码,协程在使用上和我们常说的线程相似,但是在 go 中,协程的实现并非是直接使用线程。原因有二:

  • 线程的实现由操作系统决定,它附带了太多额外的特性,比如线程有它自己的信号掩码,线程能够被赋予 CPU affinity 功能,线程能够被添加到 Cgroup 中,线程所使用的资源也可以被查询到,线程的栈空间大小默认是 2M 等等。这些在 goroutine 中不需要的特性带来了额外的性能开销。

  • 除了上面的性能问题,在 go 语言的编程模型中,操作系统无法作出最佳的决策。比如在运行一次垃圾收集时,go 的垃圾收集器要求所有线程都被停止,并且内存处于一致性状态(关于内存一致性,可以参考这篇文章:内存一致性模型)。这个涉及到要等待全部运行时线程(running threads)到达一个点(point),我们事先知道在这个点内存是一致的。当在一个随机点上调度了多个线程,你不得不等待他们中的大多数到达一致性状态。go 调度程序可以决定仅在知道内存一致的点上进行调度,这样在任何时刻,当前没有被调度的线程是处于内存一致状态的,这意味着当我们要为垃圾收集停止线程运行时,我们只需要等待那些 CPU 核心上运行的线程。

因此,go 语言实现一个自己的调度器,它可以创建更轻量的线程,也就是 goroutine,同时,调度算法需要更智能,以提升大量并发下的性能。

在讨论 go 的调度器之前,我们还要讨论一下常见的线程模型,具体的内容可以参考这篇文章:内核线程与用户线程的一点小总结.

我们根据线程的调度实现,将线程分为内核线程和用户线程。其中,内核线程是由操作系统调度,而用户线程是由用户自己实现的调度器进行调度。根据用户线程和内核线程的关系分为下面几种线程模型:

1:1模型

1:1 模型很简单,就是将一个用户线程映射或绑定到一个内核线程上,但是这会导致线程的上下文切换会很慢。

N:1模型

N:1 模型中会将多个用户线程映射或绑定到一个内核线程上,这样多个用户线程之间的上下文切换会很快,但是缺点是没法利用多核的优势。

M:N模型

1:1 模型 和 N:1 模型都有它们各自的缺点,因此 go 调度器中使用了 M:N 模型,既能保证利用到多核的优势,也能保证更快的上下文切换。这也是我们接下来要讨论的问题。

二、go 1.0 的调度器实现方案

go 1.0 的调度器实现方案其实并不是这篇文章的关注点,因为它的生命周期太短。之所以要拿出来单独说,是为了更好的理解 go 1.1 中调度器实现方案解决的问题以及带来的性能提升。

为了更好的阐述这中间的机制,我们使用符号 M 来表示内核线程,使用符号 G 来表示 goroutine。在这个版本中,只有一个全局的 goroutine 队列,所有的内核线程都要从这个队列中取出 和放回goroutine。下图是一个包含两个内核线程,只有一个全局队列的例子。

go scheduler 1.0

只有一个全局队列,导致无法保证一个 goroutine 会在同一个内核线程上调度。因此,一个 goroutine 会在不同的内核线程上调度,导致上下文切换较慢,非常影响性能。下面是一个阻塞通道的例子,用来说明这个问题:

goroutine G7 阻塞在 channel 上,等待接收一个消息。一旦接收到这个消息,G7 就被放到全局队列中。

go scheduler 1.0

G7 被放到队列尾后,队列首的 GX 得到了被执行的机会,因此它被调度上第一个 M 上执行。此时,G8 也阻塞在 channel 上。

go scheduler 1.0

G7 重新得到了执行的机会,但是因为第一个 M 正在执行 GX,因此 G7 只能调度到第二个 M 上执行。这时候就发生了跨内核线程的上下文切换。

go scheduler 1.0

只有一个全局队列还带来了另外一个问题,因为从队列中获取 goroutine 必须要加锁,导致锁的争用非常频繁。尤其是在大量 goroutine 被调度的情况下,对性能的影响也会非常明显。

另外在 Scalable Go Scheduler Design Doc 这篇文章中还提到了另外两个问题。

  • 所有的 M 都关联了内存缓存(mcache)和其他的缓存(栈空间),但实际上只有正在运行的 go 代码的 M 才需要 mcache(阻塞在系统调用的 M 不需要 mcache)。运行 go 代码的 M 和系统调用阻塞的 M 比例大概在 1:100,这就导致了大量的资源消耗(每个 mcache 会占用到 2M)以及 poor data locality(poor data locality找不到好的翻译,意思大概是内存缓存命中会很少,导致内存缓存无效,可参考这里:What does it mean that hash sets have poor data locality?

  • 激进的线程阻塞/解阻塞。因为系统调用导致工作线程经常被阻塞和解阻塞,这增加了很多的负担。

上面就是4个关于 go 1.0 中调度器的问题所在。Dmitry Vyukov 因此提出了新的调度算法,并在 go 1.1 中发布。

三、go 1.1 之后的调度器实现方案

针对在 1.0 中调度器实现的问题,go 1.1 在 M, G 的基础上,引入了新的角色 P(processor)。以下简单列出新的调度算法的改进点:

  • 新角色 P 代替了 M 的一部分功能,M 的 mcache 现在属于 P 了,并且 P 的数量等于 GOMAXPROCS。M 要执行 G 的时候,就被调度绑定一个 P,然后去执行 G,这样对内存的占用就大大减少。
  • 每个 P 都有自己的 goroutine 队列 runq,新的 G 就放到自己的 runq 上,满了之后再放到全局的 runq,优先执行自己的 runq。这样的设计大大减少了锁的争用,并且可以保证尽量少的在多个核心上传递 G。
  • 当 G 执行网络操作和锁切换时,G 和 M 分离,M 通过调度执行新的 G。这样就可以保证用户在 G 中执行网络操作时不用考虑阻塞线程的问题。
  • 当 M 因为执行系统调用阻塞或 cgo 运行一段时间后,sysmon 协程会将 P 和 M分离,由其他的 M 来结合 P 进行调度。这样 M 就不用因为阻塞而占用不必要的资源。

下面是一个比较易懂的图例:

cast

M 是内核线程,P 是调度的上下文,G 是 goroutine。

in-motion

这里可以看到。M 绑定一个 P 来执行 G,灰色部分代表待执行的、只属于 P 的 goroutine 队列。

syscall

M0 线程因为执行 syscall 导致阻塞,因此调度算法将其和 P 分离,防止其占用 P 的资源,然后将 P 分给 M1 来执行剩下的 goroutine。

steal

第二个 P 中的 goroutine 已经执行结束。因此从第一个 P 中“偷来”一部分的 goroutine 执行。

下面是一个比较详细的 GPM 模型示意图:

gpm

四、附加的参考资料

下面的一些图片是从腾讯技术团队分享的 PPT 中截取的,PPT 地址:深入浅出Golang Runtime

go-runtime

五、参考文档

mock 测试和 gomock 的使用

mock 测试是什么

在平常做单元测试中,常常会依赖外部的系统,这导致单元测试很难写。比如业务系统中有一个用户信息更新的函数 UpdateUserInfo,如果对该函数做单元测试,则需要连接数据库,建立测试所需的基础数据,然后执行测试,最后清除测试导致的数据更新。这导致单元测试的成本很高,并且难以维护。

这时候,mock 测试就可以发挥它的作用了。我们将对数据库的操作做成假的,也就是 mock 出一个假的数据库操作对象,然后注入到我们的业务逻辑中使用,然后就可以对业务逻辑进行测试。

看了描述可能还是有点糊涂,下面会用一个例子来说明

一个 mock 测试的例子

这个例子是一个简单的用户登录,其中,UserDBI 是用户表操作的接口,其实现是UserDB,我们的业务层有 UserService,实现了 Login 方法,我们现在要做的就是对 Login 这里的业务逻辑进行单元测试。项目结构如下:

.
├── db
│   └── userdb.go
├── go.mod
├── go.sum
├── mocks
└── service
    ├── user.go
    └── user_test.go

UserDBI 的代码如下:

type UserDBI interface {
    Get(name string, password string) (*User, error)
}

UserDB 的相关代码如下:


type UserDB struct { db *sql.DB } func NewUserDB(user string, password string, host string, port int, db string) (UserDBI, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", user, password, host, port, db) var userDB UserDB var err error userDB.db, err = sql.Open("mysql", dsn) if err != nil { return nil, err } return &userDB, nil } // Get 根据 UserID 获取用户资料 func (udb *UserDB) Get(name string, password string) (*User, error) { s := "SELECT * FROM user WHERE name = ? AND password = ?" stmt, err := udb.db.Prepare(s) if err != nil { return nil, err } defer stmt.Close() var user User err = stmt.QueryRow(name, password).Scan(&user) if err != nil { return nil, err } return &user, nil }

Login 的逻辑如下:


type UserService struct { db db.UserDBI } // NewUserService 实例化用户服务 func NewUserService(db db.UserDBI) *UserService { var userService UserService userService.db = db return &userService } // Login 登录 func (userService *UserService) Login(name, password string) (*db.User, error) { user, err := userService.db.Get(name, password) if err != nil { log.Println(err) return nil, err } return user, nil }

可以知道,通过 NewUserService 可以实例化出 UserService 对象,然后调用 Login 即可实现登录逻辑,但是在 Login 中调用了 UserDB 的 Get 方法,而 Get 方法又会从实际的数据库中去查询。这就是我们这个例子的测试难点:有没有办法不依赖实际的数据库去完成单元测试呢?

这里我们的 NewUserService 的参数是 UserDBI 这个接口,在实际的代码运行中,我们是将 UserDB 的实例化对象传进去的,但是在测试的时候,我们完全可以传入一个不操作数据库的假的对象,这个对象只需要实现了 UserDBI 的接口即可。因此我们创建了一个 FakeUserDB,这个 FakeUserDB 就是我们 mock 出来的内容了。这个 FakeUserDB 非常简单,因为它什么也不包含。

type FakeUserDB struct {
}

然后,这个 FakeUserDB 实现了 Get 方法,如下:

func (db *FakeUserDB) Get(name string, password string) (*User, error) {
    if name == "user" && password == "123456" {
        return &User{ID: 1, Name: "user", Password: "123456", Age: 20, Gender: "male"}, nil
    } else {
        return nil, errors.New("no such user")
    }
}

这里的 Get 方法中既可以返回正常情况,又可以返回错误的情况,完全满足我们的测试需求。这样,我们就完成 mock 测试的一大半内容了,接下来我们来实际写单元测试即可。

func TestUserLoginWithFakeDB(t *testing.T) {

    testcases := []struct {
        Name        string
        Password    string
        ExpectUser  *db.User
        ExpectError bool
    }{
        {"user", "123456", &db.User{1, "user", "123456", 20, "male"}, false},
        {"user2", "123456", nil, true},
    }

    var fakeUserDB db.FakeUserDB
    userService := NewUserService(&fakeUserDB)
    for i, testcase := range testcases {

        user, err := userService.Login(testcase.Name, testcase.Password)

        if testcase.ExpectError {
            assert.Error(t, err, "login error:", i)
        } else {
            assert.NoError(t, err, "login error:", i)
        }

        assert.Equal(t, testcase.ExpectUser, user, "user doesn't equal")
    }
}

执行单元测试:

$ go test github.com/joyme123/gomock-examples/service
ok      github.com/joyme123/gomock-examples/service     0.002s

可以看出,我们在测试时使用了 FakeUserDB,这样就彻底摆脱了数据库,并且这里的单元测试考虑了登录成功和登录失败的方式。

但是手写 FakeUserDB 同样也有点工作量,这个例子为了简洁所以体现不出来。考虑当 UserDBI 这个接口的方法很多的时候,我们需要额外手写的代码量立马就多了起来。还好 go 官方就提供了 gomock 这个工具,来帮我们更好的完成单元测试的工作。

gomock 的使用

gomock 的官方仓库地址是:https://github.com/golang/mock.git。gomock 并不复杂,其主要的工作是将我们刚刚的 FakeUserDB 由手动编写变成自动生成。因此我会用刚刚的例子加上 gomock 再做一遍示范。

gomock 的安装

执行以下命令即可安装:

GO111MODULE=on go get github.com/golang/mock/mockgen@latest

mockgen 会安装在你的 $GOPATH 下的 bin 目录中。

gomock 生成代码

在上面的例子中,我们用 FakeUserDB 实现了 UserDBI 这个接口,这里同样也是使用 mockgen 这个程序生成实现 UserDBI 的代码。

mkdir mocks
mockgen -package=mocks -destination=mocks/userdb_mock.go github.com/joyme123/gomock-examples/db UserDBI

在 mocks 下生成的文件如下:

// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/joyme123/gomock-examples/db (interfaces: UserDBI)

// Package mocks is a generated GoMock package.
package mocks

import (
    gomock "github.com/golang/mock/gomock"
    db "github.com/joyme123/gomock-examples/db"
    reflect "reflect"
)

// MockUserDBI is a mock of UserDBI interface
type MockUserDBI struct {
    ctrl     *gomock.Controller
    recorder *MockUserDBIMockRecorder
}

// MockUserDBIMockRecorder is the mock recorder for MockUserDBI
type MockUserDBIMockRecorder struct {
    mock *MockUserDBI
}

// NewMockUserDBI creates a new mock instance
func NewMockUserDBI(ctrl *gomock.Controller) *MockUserDBI {
    mock := &MockUserDBI{ctrl: ctrl}
    mock.recorder = &MockUserDBIMockRecorder{mock}
    return mock
}

// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockUserDBI) EXPECT() *MockUserDBIMockRecorder {
    return m.recorder
}

// Get mocks base method
func (m *MockUserDBI) Get(arg0, arg1 string) (*db.User, error) {
    m.ctrl.T.Helper()
    ret := m.ctrl.Call(m, "Get", arg0, arg1)
    ret0, _ := ret[0].(*db.User)
    ret1, _ := ret[1].(error)
    return ret0, ret1
}

// Get indicates an expected call of Get
func (mr *MockUserDBIMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
    mr.mock.ctrl.T.Helper()
    return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockUserDBI)(nil).Get), arg0, arg1)
}

执行测试

代码生成结束之后,我们开始写单元测试了。

func TestUserLoginWithGoMock(t *testing.T) {
    testcases := []struct {
        Name        string
        Password    string
        MockUser    *db.User
        MockErr     error
        ExpectUser  *db.User
        ExpectError bool
    }{
        {"user", "123456", &db.User{1, "user", "123456", 20, "male"}, nil, &db.User{1, "user", "123456", 20, "male"}, false},
        {"user2", "123456", nil, errors.New(""), nil, true},
    }

    ctrl := gomock.NewController(t)
    defer ctrl.Finish()

    userDB := mocks.NewMockUserDBI(ctrl)

    for i, testcase := range testcases {
        userDB.EXPECT().Get(testcase.Name, testcase.Password).Return(testcase.MockUser, testcase.MockErr)
        userService := NewUserService(userDB)
        user, err := userService.Login(testcase.Name, testcase.Password)

        if testcase.ExpectError {
            assert.Error(t, err, "login error:", i)
        } else {
            assert.NoError(t, err, "login error:", i)
        }

        assert.Equal(t, testcase.ExpectUser, user, "user doesn't equal")
    }
}

我们在测试用例中增加了两个字段:MockUser, MockErr,这就是我们 Mock 出来的数据,通过 userDB := mocks.NewMockUserDBI(ctrl) 实例化 mock 出来的 userDB,这里的 userDB 等价于上一个例子中的 fakeUserDB,然后调用 userDB.EXPECT().Get(testcase.Name, testcase.Password).Return(testcase.MockUser, testcase.MockErr) 这句话,来输入我们想输入的参数,产生我们想要的输出即可。这样在 Login 函数执行时会自动产生我们刚刚设定的 Mock 数据,完成单元测试的需求。

如果传参的时候,对参数不确定,可以使用 gomock.Any() 来代替,如果希望多次调用该方法仍然返回相同的结果,可以使用 .AnyTimes()

总结

mock 测试在实现上的重点是将外部依赖实现成可替换的,例子中使用了 UserDBI 这个接口来抽象出用户表的操作,然后使用参数的方式来实现 UserService 的实例化。接口和使用参数来实例化(也就是不要把外部依赖写死)缺一不可。只要注意到这一点就可以写出方便 mock 测试的代码。

容器中程序的信号捕捉

一、问题描述

项目中使用了 argo 在 kubernetes 集群中做工作流的调度。argo 提供了工作流的停止功能,其原理大致是检查正在运行的 Pod,向该 Pod 中的 wait 容器发送 USR2 信号,wait 容器收到 USR2 信号后,在主机上的调用 docker kill --signal TERM main_container_id 来停止我们的程序容器, 如果 10s 后容器还未停止,则发送 SIGKILL 来强制终止。但是我在实现 argo 工作流中调度 tfjob 时出现了一些问题。

argo_scheduler_tfjob

在argo停止工作流时,正在运行的 step2 中的 manager 监听了 TERM 信号,以便在工作流停止时同步停止 tfjob。但是事实情况却是 manager 退出了,但是没有收到任何的 TERM 信号。

二、问题剖析

检查这个问题的第一步是弄清楚 docker kill 背后发生了什么,官网的资料中有以下的描述:

Note: ENTRYPOINT and CMD in the shell form run as a subcommand of /bin/sh -c, which does not pass signals. This means that the executable is not the container’s PID 1 and does not receive Unix signals.

当我们用 sh 执行一段 shell script 时,在 shell script 中的可执行文件的 PID 不是1,并且 sh 也不会帮忙转发 TERM 信号,导致我们的可执行文件无法接收到终止信号,并执行清理逻辑。

我们的 manager 确实是用了一段 shell script 来启动的,可能就是因为这个原因导致无法收到 TERM 信号。

三、问题复现

我写了一段很简单的 go 程序,监听了 TERM 信号,然后打印一段文字。

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)

    s, ok := <-sigs
    if !ok {
        log.Println("信号接收出错")
        os.Exit(1)
    }

    log.Println("收到信号:", s.String())
}

我的 Dockerfile 如下:

FROM alpine:latest
LABEL maintainr="jiangpengfei <jiangpengfei12@gmail.com>"

COPY main /usr/bin/main
COPY run.sh /usr/bin/run.sh
RUN chmod +x /usr/bin/main && chmod +x /usr/bin/run.sh

CMD ["sh", "-c", "/usr/bin/run.sh"]

run.sh 如下:

#!/bin/sh
/usr/bin/main

执行这个容器后,查看容器内的进程:

PID   USER     TIME  COMMAND
    1 root      0:00 {busybox} ash /usr/bin/run.sh
    6 root      0:00 /usr/bin/main
   12 root      0:00 sh
   17 root      0:00 ps

可以发现,run.sh 是 PID 为1, main 程序是6。此时我们使用 docker kill --signal TERM main_container_id 来停止容器,发现确实是没有反应的。因为 TERM 信号会发送给 PID 为 1 的进程。同时也因为 sh 不响应 TERM 信号,也不会转发该信号给子进程,所以容器也不会退出。如果我们使用 docker stop 退出的话,会发现很慢,这是因为 docker stop 会尝试先用 TERM 信号来终止进程,一段时间后发现没有退出的话再使用 KILL 信号。

四、解决方案

这个问题的解决方案有很多,要么让我们的程序进程成为 PID 1,要么让 PID 为 1 的进程转发这个 TERM 信号给我们的子进程。

方法一: 在 shell script 中使用 exec

将我们的 run.sh 改成如下:

#!/bin/sh
exec /usr/bin/main

然后再查看容器内的进程列表:

PID   USER     TIME  COMMAND
    1 root      0:00 /usr/bin/main
   11 root      0:00 sh
   16 root      0:00 ps

可以发现,main 进程的PID 是 1, 我们使用 docker kill --signal TERM main_container_id 来杀死进程,出现如下打印语句:

2020/01/17 23:46:24 收到信号: terminated

可见,exec 可以让我们的 main 进程成为 PID 为 1, 关于 exec 的作用描述如下:

The exec() family of functions replaces the current process image with a new process image.

即使用新进程的镜像替换当前进程的镜像数据,可以理解为exec系统调用并没有创建新的进程,只是替换了原来进程上下文的内容。原进程的代码段,数据段,堆栈段被新的进程所代替。这样我们的 main 进程就顺利成章的替换了 sh 进程成为 PID 为 1 的进程了。

方法二: 直接使用 main 作为镜像入口

这是最简单的方法了,但是很多时候会有限制,因为我们希望在 shell script 中写一些逻辑来调用程序。

方法三: 借助第三方程序

一些第三方的程序专门提供了这样的作用,以它们作为启动的入口,这些第三方程序会 watch 所有它产生的子进程,在这些子进程退出后自动退出,并且在其收到 TERM 信号后发送给子进程。

这里我们用 smell-baron 这个应用作为例子

修改 Dockerfile:

FROM alpine:latest
LABEL maintainr="jiangpengfei <jiangpengfei12@gmail.com>"

COPY main /usr/bin/main
COPY run.sh /usr/bin/run.sh
RUN chmod +x /usr/bin/main && chmod +x /usr/bin/run.sh
RUN wget -O /usr/bin/smell-baron https://github.com/insidewhy/smell-baron/releases/download/v0.4.2/smell-baron.musl && chmod +x /usr/bin/smell-baron

CMD ["/usr/bin/smell-baron", "/usr/bin/run.sh"]

查看容器内的进程:

PID   USER     TIME  COMMAND
    1 root      0:00 /usr/bin/smell-baron /usr/bin/run.sh
    6 root      0:00 /usr/bin/main
   14 root      0:00 sh
   19 root      0:00 ps

使用 docker kill 发现 main 收到了 TERM 信号。

1.Multiple commands can be run, smell-baron will exit when all the watched processes have exited.

2.Whether a spawned process is watched can be configured.

3.smell-baron can be told to signal all child processes on termination, this allows it to cleanly deal with processes that spawn a subprocess in a different process group then fail to clean it up on exit.

kubernetes存储–FlexVolume

简介

kubernetes 使用 volume 来满足它的存储需求,它支持很多的存储系统,比如 nfs、 glusterfs、cephfs等等,但是这些存储的实现方式有一个问题,就是它们的实现代码都必须合并到 Kubernetes 的代码中(称为 in-tree),这为 kubernetes 社区带来了维护上的成本。因此,kubernetes 提出了两种 out-of-tree 的方案: FlexVolume 和 csi。通过这两种方案实现的存储功能不必合并到 kubernetes 的代码仓库,由存储系统的供应商单独维护。

FlexVolume 是这篇文章主要关注的点,FlexVolume 自 1.2 版本开始就支持了。它使用基于 exec 的模型来与驱动程序对接。用户必须在每个节点(有些情况下包括主节点)上的预定义卷插件路径中安装 FlexVolume 驱动程序的可执行文件。当需要挂载 volume 的时候,由 kubelet 执行挂载命令来挂载即可。

基于 nfs 实现 FlexVolume

在探究 FlexVolume 的实现原理之前,我们可以先看一下官方提供的基于 nfs 的例子

注: 我这里是用 minikube 启动的本地 kubernetes 集群。

为了部署基于 nfs 实现的 FlexVolume,我们首先将目录下的 nfs 复制到 deploy 文件夹下

$ cp nfs deploy

然后将 deploy/deploy.sh 中的 dummy 修改成 nfs,表示我们使用的插件脚本是 nfs 这个可执行文件。

接着在 deploy 文件夹下构建 docker 镜像,这里要修改 Dockerfile,将 nfs COPY 到镜像中。然后执行下面的命令(镜像标签需要修改成你自己的):

$ docker build -t joyme/nfs-flexvolume:1.0 .
$ docker push joyme/nfs-flexvolume:1.0

镜像构建并推送完成之后,我们就开始部署了。因为 FlexVolume 要求将驱动文件放在指定的目录下,最粗暴的方式就是手动将文件 scp 到集群的每个节点上。这里为了方便,我们还可以使用 kubernetes 的 Daemenset,然后使用 hostPath 将文件放到主机之上。我们修改 deploy 文件夹下的 ds.yaml 这个部署文件。将我们刚刚推送的镜像填进去。然后执行以下命令进行部署。

$ kubectl apply -f ds.yaml

这里有个地方要注意, 默认的插件安装地址是 /usr/libexec/kubernetes/kubelet-plugins/volume/exec/, 但是 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 都可以修改这个值,如果你启动这些组件是指定了这些参数,那就需要修改 ds.yaml 中的路径。

在集群中部署完成之后,我们可以到某个节点上检查一下/usr/libexec/kubernetes/kubelet-plugins/volume/exec/是否存在我们刚刚部署的文件。

最后我们创建一个 nginx,挂载一个 FlexVolume。在创建之前,我们需要先启动一个 nfs server,这里为了方便,可以使用容器启动一个。

$ docker run -d --privileged --restart=always \
-v /tmp:/dws_nas_scratch \
-e NFS_EXPORT_DIR_1=/dws_nas_scratch \
-e NFS_EXPORT_DOMAIN_1=\* \
-e NFS_EXPORT_OPTIONS_1=ro,insecure,no_subtree_check,no_root_squash,fsid=1 \
-p 111:111 -p 111:111/udp \
-p 2049:2049 -p 2049:2049/udp \
-p 32765:32765 -p 32765:32765/udp \
-p 32766:32766 -p 32766:32766/udp \
-p 32767:32767 -p 32767:32767/udp \
fuzzle/docker-nfs-server:latest

使用官方提供的 nginx-nfs.yaml 文件,然后把其中的 server 地址修改一下,使用以下命令创建:

$ kubectl apply -f nginx-nfs.yaml

注意:如果出现错误,可以检查 node 上是否安装了 jq, nfs-common 等必要的依赖包。

实现原理

在完成上面例子的过程中,关于 FlexVolume 的大多数问题都比较好解答了。我们来看一下 nfs 的实现代码:

usage() {
    err "Invalid usage. Usage: "
    err "\t$0 init"
    err "\t$0 mount <mount dir> <json params>"
    err "\t$0 unmount <mount dir>"
    exit 1
}

err() {
    echo -ne $* 1>&2
}

log() {
    echo -ne $* >&1
}

ismounted() {
    MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1`
    if [ "${MOUNT}" == "${MNTPATH}" ]; then
        echo "1"
    else
        echo "0"
    fi
}

domount() {
    MNTPATH=$1

    NFS_SERVER=$(echo $2 | jq -r '.server')
    SHARE=$(echo $2 | jq -r '.share')

    if [ $(ismounted) -eq 1 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    mkdir -p ${MNTPATH} &> /dev/null

    mount -t nfs ${NFS_SERVER}:/${SHARE} ${MNTPATH} &> /dev/null
    if [ $? -ne 0 ]; then
        err "{ \"status\": \"Failure\", \"message\": \"Failed to mount ${NFS_SERVER}:${SHARE} at ${MNTPATH}\"}"
        exit 1
    fi
    log '{"status": "Success"}'
    exit 0
}

unmount() {
    MNTPATH=$1
    if [ $(ismounted) -eq 0 ] ; then
        log '{"status": "Success"}'
        exit 0
    fi

    umount ${MNTPATH} &> /dev/null
    if [ $? -ne 0 ]; then
        err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"
        exit 1
    fi

    log '{"status": "Success"}'
    exit 0
}

op=$1

if ! command -v jq >/dev/null 2>&1; then
    err "{ \"status\": \"Failure\", \"message\": \"'jq' binary not found. Please install jq package before using this driver\"}"
    exit 1
fi

if [ "$op" = "init" ]; then
    log '{"status": "Success", "capabilities": {"attach": false}}'
    exit 0
fi

if [ $# -lt 2 ]; then
    usage
fi

shift

case "$op" in
    mount)
        domount $*
        ;;
    unmount)
        unmount $*
        ;;
    *)
        log '{"status": "Not supported"}'
        exit 0
esac

exit 1

其实就是一段 shell 脚本,支持三个命令: init、mount、unmount。当我们在集群中为某个 pod 挂载 FlexVolume时,该 pod 所在节点的 kubelet 会调用其指定的插件脚本执行 mount 命令,然后挂载给 pod 使用。当然了,FlexVolume 还支持更复杂的插件。这个可以看官方的文档: flexvolume

部署方案

关于如何部署 FlexVolume 的插件,其实在例子中也有提到,这里可以总结一下:

  • 手动部署到每个节点的指定目录下,比如我们刚刚部署的 nfs ,其实际路径是: /usr/libexec/kubernetes/kubelet-plugins/volume/exec/k8s~nfs。其中 /usr/libexec/kubernetes/kubelet-plugins/volume/exec 是默认路径,也可以通过 kubelet 的参数 --volume-plugin-dir 和 controller manager 的参数 --flex-volume-plugin-dir 来指定。k8s~nfs 这个路径中,k8s 是供应商, nfs 是驱动名称,在使用的时候可以这样指定: `driver: “k8s/nfs”。

  • 使用 kubernetes 的 deamonset 配合 hostPath 来部署,因为 daemonset 会在每个节点上都启动 pod,然后通过 hostPath 将插件放在指定的位置即可。kubernetes 集群中 master 节点可能被设置成不允许调度。这种情况下 daemonset 默认不调度到 master 节点上,可以使用 tolerations 来解决这个问题. 具体可参考: Scheduler is not scheduling Pod for DaemonSet in Master node

  • 其实除了 kubelet 要调用插件之外,controller-manager 也要调用。比如执行 init, attach, detach, waitforattach, isattached 等命令。

argo的输入输出源代码分析

简介

argo是一个工作流的调度引擎,支持 Steps 和 DAG 这两种工作流。

  • Steps: 是按照步骤,从前往后的工作流调度方案。工作流中的每一步都只依赖上一步的结果
  • DAG: 全称是 directed acyclic graph,译为有向无环图。与 Steps 的区别在于每一步可能依赖之前的多步输出,但是不会循环依赖(也就是无环)

不论是在什么类型的工作流上,argo都抽象出了两种输入输出:

  • parameters: 通常情况下都是字符串,该字符串可以来源于标准输出,也可以来源于文件的内容
  • artifacts: 可以理解成文件

输入输出是连接整个工作流的核心。每一步都可以看作是一次函数调用。那么在argo中,它是如何实现在多步之间输入输出的传输呢?下面会通过源代码进行分析。

在看代码之前,可以看一个 argo 的工作流中的一个pod,为了查看更方便,我删除一些不需要关注的字段:

$ kubectl -n workflow describe pods custom-workflow-111-2fw2f-2639432629

Name:           custom-workflow-111-2fw2f-2639432629
Namespace:      workflow
Labels:         pipeline.starx.com/nodeID=743
                workflows.argoproj.io/completed=true
                workflows.argoproj.io/workflow=custom-workflow-111-2fw2f
Annotations:    cni.projectcalico.org/podIP: 10.42.0.83/32
                workflows.argoproj.io/node-name: custom-workflow-111-2fw2f.yolov3-evaluate-743
                workflows.argoproj.io/outputs:
                  {"result":...
                workflows.argoproj.io/template:
                  {"name":"yolov3-evaluate-743","inputs":{"parameters":[{"name":"userParam","value":"eyJTY29yZVRocmVzaG9sZCI6MC41LCJJb3VfVGhyZXNob2xkIjowLjQ...
Controlled By:  Workflow/custom-workflow-111-2fw2f
Init Containers:
  init:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      init
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/inputs/artifacts from input-artifacts (rw)
      /argo/podmetadata from podmetadata (rw)
      /argo/staging from argo-staging (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
Containers:
  wait:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      wait
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/podmetadata from podmetadata (rw)
      /mainctrfs/argo/staging from argo-staging (rw)
      /mainctrfs/tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /mainctrfs/tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
      /var/run/docker.sock from docker-sock (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
  main:
    Image:         registry.cn-shanghai.aliyuncs.com/xinhuodev/wt:0.4
    Command:
      sh
    Args:
      /argo/staging/script
    Mounts:
      /argo/staging from argo-staging (rw)
      /tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
Volumes:
  podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations
  docker-sock:
    Type:          HostPath (bare host directory volume)
    Path:          /var/run/docker.sock
    HostPathType:  Socket
  input-artifacts:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  argo-staging:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  default-token-lfk5b:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-lfk5b
    Optional:    false

我们需要关注的信息有:

  • Pod 的 Annotations
  • Init Containers 启动的初始化容器
  • Containers 中的 wait 容器和 main 容器
  • Pod 的 Volumes 和每个容器的 Mounts

Init 容器

argo 创建的 Pod 的初始化容器执行了 argoexec init 命令,从名字上可以猜测出,这个容器负责初始化 Pod 中的环境,比如获取来上一步的输入等等,对应的代码是 cmd/argoexec/commands/init.go, 我们的分析也从这里开始。在执行 argo exec init之后,第一个调用的函数应该是loadArtifacts()。这个方法中做了三件事: initExecutor()wfExecutor.StageFiles()wfExecutor.LoadArtifacts()

initExecutor:

initExecutor 的代码如下(删除了不重要的代码):

func initExecutor() *executor.WorkflowExecutor {
    tmpl, err := executor.LoadTemplate(podAnnotationsPath)

    var cre executor.ContainerRuntimeExecutor
    switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
    case common.ContainerRuntimeExecutorK8sAPI:
        cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
    case common.ContainerRuntimeExecutorKubelet:
        cre, err = kubelet.NewKubeletExecutor()
    case common.ContainerRuntimeExecutorPNS:
        cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
    default:
        cre, err = docker.NewDockerExecutor()
    }

    wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
    yamlBytes, _ := json.Marshal(&wfExecutor.Template)
    return &wfExecutor
}

podAnnotationsPath加载模板,这个模板其实就是 Argo 中单步的执行模板,默认情况下它的值是 /argo/podmetadata/annotations,这正好是 init 容器的挂载,而这个挂载对应的卷是:

 podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations

这里的 DownwardAPI 也解释一下,它是一种 volume 的类型,可以将 Pod 和 Container 的字段通过挂载文件的方式提供给容器内的进程方案。那么这里就是将 Pod 的 Annotations 字段通过上面的路径提供给 init 容器,init 容器根据其中的 template 获取该 Pod 的输入输出。

接下来判断根据容器运行时进行判断,这里我们只考虑 docker 作为容器运行时的情况。最后调用NewExecutor实例化了一个 wfExecutor

StageFiles()

源代码如下:

func (we *WorkflowExecutor) StageFiles() error {
    var filePath string
    var body []byte
    switch we.Template.GetType() {
    case wfv1.TemplateTypeScript:
        log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
        filePath = common.ExecutorScriptSourcePath
        body = []byte(we.Template.Script.Source)
    case wfv1.TemplateTypeResource:
        log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
        filePath = common.ExecutorResourceManifestPath
        body = []byte(we.Template.Resource.Manifest)
    default:
        return nil
    }
    err := ioutil.WriteFile(filePath, body, 0644)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return nil
}

职责很简单,根据 template 的类型,写入到不同的文件中,比如 script 就写入到 /argo/staging/script。这就是我们在 main 容器中执行的脚本了。

LoadArtifacts

// LoadArtifacts loads artifacts from location to a container path
func (we *WorkflowExecutor) LoadArtifacts() error {
    for _, art := range we.Template.Inputs.Artifacts {
        artDriver, err := we.InitDriver(art)

        var artPath string
        mnt := common.FindOverlappingVolume(&we.Template, art.Path)
        if mnt == nil {
            artPath = path.Join(common.ExecutorArtifactBaseDir, art.Name)
        } else {
            // If we get here, it means the input artifact path overlaps with an user specified
            // volumeMount in the container. Because we also implement input artifacts as volume
            // mounts, we need to load the artifact into the user specified volume mount,
            // as opposed to the `input-artifacts` volume that is an implementation detail
            // unbeknownst to the user.
            log.Infof("Specified artifact path %s overlaps with volume mount at %s. Extracting to volume mount", art.Path, mnt.MountPath)
            artPath = path.Join(common.ExecutorMainFilesystemDir, art.Path)
        }

        // The artifact is downloaded to a temporary location, after which we determine if
        // the file is a tarball or not. If it is, it is first extracted then renamed to
        // the desired location. If not, it is simply renamed to the location.
        tempArtPath := artPath + ".tmp"
        err = artDriver.Load(&art, tempArtPath)
        if err != nil {
            return err
        }
        if isTarball(tempArtPath) {
            err = untar(tempArtPath, artPath)
            _ = os.Remove(tempArtPath)
        } else {
            err = os.Rename(tempArtPath, artPath)
        }

        if art.Mode != nil {
            err = os.Chmod(artPath, os.FileMode(*art.Mode))
        }
    }
    return nil
}

InitDriver是初始化 Artifacts 的驱动。Argo 支持多种类型的存储系统,在 v2.3.0 这个版本支持: s3, http, git, artifactory, hdfs, raw。

FindOverlappingVolume 是检查 artifacts 的路径和用户挂载的路径是否有重合。如果有,则返回深度最深的路径,如果没有,则返回 nil。如果返回 nil, 则使用 /argo/inputs/artifacts 作为 artifacts 的基础路径。否则使用 /mainctrfs 作为路径。

下面就是下载文件,解压文件并修改权限了。

注意在这里,init、wait和main容器都挂载了input-artifactsargo-staging,并且 init 将输入和script放在了这两个卷中,所以其他几个卷都可以共享这些文件。

wait 容器

wait容器的职责有以下几点:

  • 等待 main 容器结束
  • 杀死 sidecar
  • 保存日志
  • 保存 parameters
  • 保存 artifacts
  • 获取脚本的输出流
  • 将输出放在 Annotations 上

下面我们看这些功能点的实现:

等待 main 容器结束

// Wait is the sidecar container logic which waits for the main container to complete.
// Also monitors for updates in the pod annotations which may change (e.g. terminate)
// Upon completion, kills any sidecars after it finishes.
func (we *WorkflowExecutor) Wait() error {
    // WaitInit() 是初始化操作,只有 PSN 需要
    err := we.RuntimeExecutor.WaitInit()
    if err != nil {
        return err
    }
    log.Infof("Waiting on main container")
    // waitMainContainerStart的主要原理是周期轮询Pod中的所有容器,检查main容器的ContainerID字段
    // 不为空说明启动了
    mainContainerID, err := we.waitMainContainerStart()
    if err != nil {
        return err
    }
    log.Infof("main container started with container ID: %s", mainContainerID)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // monitorAnnotations是因为pod的annotations会更改
    annotationUpdatesCh := we.monitorAnnotations(ctx)
    // 超时会杀死
    go we.monitorDeadline(ctx, annotationUpdatesCh)

    // 这里是直接用ContainerRuntime去等待容器结束的,比如docker,直接调用docker wait
    err = we.RuntimeExecutor.Wait(mainContainerID)
    if err != nil {
        return err
    }
    log.Infof("Main container completed")
    return nil
}

杀死 sidecar

main 容器运行结束后,wait 容器会负责杀死其他容器(这个让我发现了之前用 sidecar 做 main 容器运行结束后的清理工作一直无效的原因)。

// KillSidecars kills any sidecars to the main container
func (we *WorkflowExecutor) KillSidecars() error {
    if len(we.Template.Sidecars) == 0 {
        log.Infof("No sidecars")
        return nil
    }
    log.Infof("Killing sidecars")
    pod, err := we.getPod()
    if err != nil {
        return err
    }
    sidecarIDs := make([]string, 0)
    // 遍历pod中的容器,排除main和wait,然后调用runtime来杀死容器
    for _, ctrStatus := range pod.Status.ContainerStatuses {
        if ctrStatus.Name == common.MainContainerName || ctrStatus.Name == common.WaitContainerName {
            continue
        }
        if ctrStatus.State.Terminated != nil {
            continue
        }
        containerID := containerID(ctrStatus.ContainerID)
        log.Infof("Killing sidecar %s (%s)", ctrStatus.Name, containerID)
        sidecarIDs = append(sidecarIDs, containerID)
    }
    if len(sidecarIDs) == 0 {
        return nil
    }
    return we.RuntimeExecutor.Kill(sidecarIDs)
}

保存日志

argo 是支持将 main 容器中的日志持久化并保存到指定的地方的(s3, hdfs, Artifactory)。这在 argo 的文档上好像没有提到过。这一部分的逻辑比较简单,就是通过 ContainerRuntime 获取获取容器中的输出流,然后存成文件,通过 argo 中的 storage driver 保存下来。

保存 parameters

// SaveParameters will save the content in the specified file path as output parameter value
func (we *WorkflowExecutor) SaveParameters() error {
    if len(we.Template.Outputs.Parameters) == 0 {
        log.Infof("No output parameters")
        return nil
    }
    log.Infof("Saving output parameters")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    // 遍历模板参数
    for i, param := range we.Template.Outputs.Parameters {
        log.Infof("Saving path output parameter: %s", param.Name)
        // Determine the file path of where to find the parameter
        if param.ValueFrom == nil || param.ValueFrom.Path == "" {
            continue
        }

        var output string
        if we.isBaseImagePath(param.ValueFrom.Path) {
            log.Infof("Copying %s from base image layer", param.ValueFrom.Path)
            // 容器内,通过 runtime 获取
            output, err = we.RuntimeExecutor.GetFileContents(mainCtrID, param.ValueFrom.Path)
            if err != nil {
                return err
            }
        } else {
            log.Infof("Copying %s from from volume mount", param.ValueFrom.Path)
            mountedPath := filepath.Join(common.ExecutorMainFilesystemDir, param.ValueFrom.Path)
            // 容器的挂载卷,直接获取
            out, err := ioutil.ReadFile(mountedPath)
            if err != nil {
                return err
            }
            output = string(out)
        }

        outputLen := len(output)
        // Trims off a single newline for user convenience
        if outputLen > 0 && output[outputLen-1] == '\n' {
            output = output[0 : outputLen-1]
        }
        // 保存下来
        we.Template.Outputs.Parameters[i].Value = &output
        log.Infof("Successfully saved output parameter: %s", param.Name)
    }
    return nil
}

保存 artifacts

保存 artifacts 和 保存 parameters 的操作是一样的。

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts() error {
    if len(we.Template.Outputs.Artifacts) == 0 {
        log.Infof("No output artifacts")
        return nil
    }
    log.Infof("Saving output artifacts")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    err = os.MkdirAll(tempOutArtDir, os.ModePerm)
    if err != nil {
        return errors.InternalWrapError(err)
    }

    for i, art := range we.Template.Outputs.Artifacts {
        err := we.saveArtifact(mainCtrID, &art)
        if err != nil {
            return err
        }
        we.Template.Outputs.Artifacts[i] = art
    }
    return nil
}

获取脚本的输出流

直接调用 runtime 去获取 main 容器的输出流,然后保存到 template.outputs 中

func (we *WorkflowExecutor) CaptureScriptResult() error {
    if we.Template.Script == nil {
        return nil
    }
    log.Infof("Capturing script output")
    mainContainerID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }
    reader, err := we.RuntimeExecutor.GetOutputStream(mainContainerID, false)
    if err != nil {
        return err
    }
    defer func() { _ = reader.Close() }()
    bytes, err := ioutil.ReadAll(reader)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    out := string(bytes)
    // Trims off a single newline for user convenience
    outputLen := len(out)
    if outputLen > 0 && out[outputLen-1] == '\n' {
        out = out[0 : outputLen-1]
    }
    we.Template.Outputs.Result = &out
    return nil
}

将输出放在 Annotations 上

将 outputs 存在 pod 的 annotations 上。

func (we *WorkflowExecutor) AnnotateOutputs(logArt *wfv1.Artifact) error {
    outputs := we.Template.Outputs.DeepCopy()
    if logArt != nil {
        outputs.Artifacts = append(outputs.Artifacts, *logArt)
    }

    if !outputs.HasOutputs() {
        return nil
    }
    log.Infof("Annotating pod with output")
    outputBytes, err := json.Marshal(outputs)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return we.AddAnnotation(common.AnnotationKeyOutputs, string(outputBytes))
}

总结

init 容器做了 pod 的初始化,包括存储 script,下载 artifacts等等,这样我们的 main 容器就不用关心输入的来源,只需要在指定地方使用即可。wait 容器负责监控 main 容器的生命周期,在 main 容器中的主要逻辑运行结束之后,负责将输出部分读取,持久化,这样 main 容器就不用操心如何将该步产生的结果传到后面的步骤上的问题。

VXLAN网络基础

简介

VXLAN 全称 Virtual eXtensible Local Area Network, 是一种基于三层网络构建虚拟的二层网络的方案。它使用 UDP 封装二层的数据帧,实现了 overlay 网络。所有处于 overlay 网络中的设备均感觉不到底层和传统网络的差别。

相关知识点

OSI七层网络模型

OSI 的七层网络模型从下到上依次是: 物理层,数据链路层,网络层,传输层、会话层、表示层、应用层

我们在简介中提到的二层、三层都是这七层中的,二层是数据链路层,它主要抽象了根据mac地址来传输数据帧这一过程。三层是网络层,典型的是ipv4, ipv6这样的网络协议,根据 ip 地址来传输 ip 数据报。

注意虽然 VXLAN 是基于 UDP 封装了数据帧,但是我们一般说它是基于三层而不是四层。因为在这里我们关注的是数据的是如何传输到指定地址的,而不是如何封装的。

overlay 的含义

overlay 字面含义就是上层的,还有一个对应的词,也就是underlay。结合在一起就好理解了,
VXLAN 是 overlay 网络,说的是它实现的二层(数据链路层)是 overlay 的,这二层是基于三层(网络层)的 underlay 网络。

单播和多播

下面的定义来源于维基百科

  • 单播: 英文 unicast, 是指数据包在计算机网络的传输中,目的地址为单一目标的一种传输方式。它是现今网络应用最为广泛,通常所使用的网络协议或服务大多采用单播传输,例如一切基于TCP的协议。

  • 多播(组播): 英文 multicast,是指把信息同时传递给一组目的地址。它使用的策略是最高效的,因为消息在每条网络链路上只需传递一次,且只有在链路分叉的时候,消息才会被复制。

数据帧

以常见的 EthernetII 帧为例,其帧格式如下:

ethernetII frame

D.MAC: 6byte,目标 MAC 地址
S.MAC: 6byte, 来源 MAC 地址
Type: 2byte, 0x0800是 IP 类型,0x0806 是 ARP 类型
Data: 数据
FCS: 为了进行差错检验而添加的冗余码。

以下是我用 wireshark 抓的 arp 帧:

arp in wireshark

我在笔记本(192.168.31.243)上 ping 了 192.168.31.133 这个地址,因为我的笔记本不知道 192.168.31.133 的mac地址,因此使用 arp 帧来查找目的mac地址。

VLAN

VLAN(Virtual Local Area Network) 和本文介绍的 VXLAN 从名称上看就很相似,中文名称叫做虚拟局域网,它们的作用也是一样的,可以用来划分子网。下面采用维基百科上关于虚拟局域网的介绍。

虚拟区域网络(Virtual Local Area Network或简写VLAN, V-LAN)是一种建构于局域网交换技术(LAN Switch)的网络管理的技术,网管人员可以借此透过控制交换机有效分派出入局域网的报文到正确的出入端口,达到对不同实体局域网中的设备进行逻辑分群(Grouping)管理,并降低局域网内大量数据流通时,因无用报文过多导致壅塞的问题,以及提升局域网的信息安全保障。

但是 VLAN 是基于二层的方案,它会在数据帧头部添加4个字节的 VLAN Tag,其中 12bit 用来标识不同的二层网络,这样总共是 4000 多个。其次 VLAN 会使用 MAC 地址表来记录 VLAN ID、 MAC 和 Port 这三者之间的关系,因此一旦网络中主机数量多起来,会导致 MAC 地址表占用很大的内存。 关于 VLAN 和 VXLAN的区别,可以参考这篇文章: VXLAN vs VLAN

VXLAN 协议

vxlan protocol

上图从整体上来看,是一个 UDP 的报文,在 UDP 的数据部分的前8位是 VXLAN Header,表明这个 UDP 封装的是 VXLAN 的数据帧,后面则是原始的2层数据帧了。在 VXLAN Header中,有下面几个字段:

  • VXLAN RRRR1RRR: VXLAN 的标记位
  • Reserved: 保留位
  • VNID: 24位的 VNI 字段
  • Reserved: 保留字段

VXLAN 的实现原理

VXLAN 将以太网数据帧封装在 UDP 内,进而在三层网络传输。VXLAN 数据的封装和解封发生在 VTEP(VXLAN Tunnel EndPoint)。VTEP 是 VXLAN 网络的边缘设备。同时每个 VXLAN 网络都有唯一的 VNI(VXLAN Network Identifier) 标识,这样在一个物理网络上可以构建多个 VXLAN 虚拟网络,满足多租户的要求。下图是 VXLAN 的网络架构示意图。

vxlan

这里面有两个比较重要的概念:

  • VTEP: VTEP 和传统交换机类似,也是基于 MAC 地址表工作,是 VXLAN 网络的边缘设备,用来对 VXLAN 报文封包和解包。VTEP 可以是网络设备(比如交换机),也可以是一台机器(比如虚拟化集群中的宿主机)。在 VTEP 中,可以认为有两个表: 一个是 VLAN 和 VXLAN 的对应关系表;另一个是 MAC 地址表,里面包含了很多 MAC 地址,VXLAN ID 和远端 VTEP IP 地址的对应关系。 VTEP 收到下面主机的网络数据帧时,会先根据 VLAN 查第一个表获取对应的 VXLAN ID,之后根据 VXLAN ID和目的 MAC 地址,查 MAC 地址表获取远端 VTEP 的 IP 地址。最后, VTEP 会剥离VLAN Tag,按照 VXLAN 格式封装数据帧,发往远端的 VTEP。远端的 VTEP 收到该数据后进行解包,根据 MAC 地址将数据帧发往其所连接的主机。

  • VNI: VNI 是每个VXLAN的标识,也就是上面说的 VXLAN ID,共24位,那么就可以表示 2^24=16777216 个 VXLAN 网络。每个 VXLAN ID 对应一个租户,那么理论上可以支撑千万级别的租户。

vxlan-vtep

图例: VXLAN VTEP

这里又引出另外一个问题, VXLAN 中的一台主机在只知道 ip 的情况下,如何获取对方的 MAC 地址。在传统网络中,ARP 请求是用来解决这个问题的。

  • VXLAN 网络中,主机发出的 ARP 请求会被 VTEP(1) 接收到,VTEP(1) 发现虚拟机目的 MAC 为广播地址,封装上 VXLAN 协议头部之后,发送给多播组,支持多播的底层网络设备会把报文发送给组内的所有成员
  • VTEP(2) 接收到 VXLAN 封装的 ARP 请求,去掉 VXLAN 头部,并通过报文学习到发送方 <虚拟机MAC-VNI-VTEP IP>这个对应关系,并把原来的 ARP 报文广播给主机。
  • 主机接受到 ARP 请求报文,如果 ARP 报文请求的是自己的 MAC 地址,就返回 ARP 应答
  • VTEP(2) 此时已经知道发送方的虚拟机 MAC 和 VTEP 信息,把 ARP 应答添加上 VXLAN 头部之后通过单播发送出去
  • VTEP(1)接收到报文,并学习到报文中的的对应关系,记录下来。然后 VTEP 进行解包,知道内部的 IP 和 MAC 地址,并转发给虚拟机。
  • 虚拟机拿到 ARP 应答报文,就知道了对方 IP 对应的 MAC 地址。

在这次多播之后,两台虚拟机之间的通信就可以通过单播了。VTEP 在这中间担任了一个代理的角色,使得虚拟机之间可以透明的进行网络通信。这和 nginx 担任反向代理的角色有点类似。同时我们可以发现,在一个大规模的 VXLAN 网络中,多播会是一件很消耗性能的事。

资料地址

VXLAN vs VLAN

VXLAN in OpenStack Neutron

VXLAN 协议原理简介

linux 上实现vxlan网络

linux ip 命令的使用

简介

linux 下的 ip 命令是一个很强大的工具,在这之前,我通常只会使用 ifconfig 命令来查看本机网络接口和 ip 地址等等。或者 netstat 命令查看端口占用等等。ip 命令属于 iproute2 套件中的一个命令,关于 iproute2 和 linux net-tools 中的命令对比如下(图片来源:https://linux.cn/article-3144-1.html):

net-tools vs iproute2

可以看出,除了部分 netstat 命令用 ss 来替代,其它都可以用 ip 命令替代。并且,iproute2 已经是大多数 linux 发行版默认安装了,而 net-tools 则需要另外安装。

ip 命令可以分为下面几个模块:

  • 网卡设备相关: ip link
  • 网卡地址相关: ip addr
  • 路由表相关: ip route
  • arp 相关: ip neigh

下面会列出一些常用的操作,最好在虚拟机中操作,防止影响个人机器。

ip link

查看 ip link 的帮助

$ ip link help

查看网络接口

$ ip link list

1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP mode DEFAULT group default qlen 1000
    link/ether 52:54:00:8a:fe:e6 brd ff:ff:ff:ff:ff:ff
3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP mode DEFAULT group default qlen 1000
    link/ether 08:00:27:15:ee:5c brd ff:ff:ff:ff:ff:ff

这里显示了三个网络接口,lo代表的本机的回环网卡,eth0eth1 分别是两个网卡

添加网络接口

$ sudo ip link add link eth0 mydev type bridge

这里添加了一个网桥,连接在 eth0 上。使用 ip link list 查看可以发现多了下面一个设备

6: mydev: <BROADCAST,MULTICAST> mtu 1500 qdisc noop state DOWN mode DEFAULT group default qlen 1000
    link/ether 5e:0c:36:7b:ce:0d brd ff:ff:ff:ff:ff:ff

删除网络接口

$ sudo ip link delete link dev mydev

关闭网络接口

$ sudo ip link set eth1 down

打开网络接口

$ sudo ip link set eht1 up

ip addr

查看帮助

$ ip addr help

查看网络地址

$ ip addr list

查看某一个网络接口的地址

$ ip addr show eth1

添加 ip 地址

$ sudo ip addr add 192.168.31.131/24 dev eth1

查看 eth1 的地址

$ ip addr show eth1

3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
    link/ether 08:00:27:15:ee:5c brd ff:ff:ff:ff:ff:ff
    inet 192.168.31.77/24 brd 192.168.31.255 scope global noprefixroute dynamic eth1
       valid_lft 42769sec preferred_lft 42769sec
    inet 192.168.31.131/24 scope global secondary eth1
       valid_lft forever preferred_lft forever
    inet6 fe80::a00:27ff:fe15:ee5c/64 scope link 
       valid_lft forever preferred_lft forever

我们也可以 ping 一下这个地址:

$ ping 192.168.31.131

PING 192.168.31.131 (192.168.31.131) 56(84) bytes of data.
64 bytes from 192.168.31.131: icmp_seq=1 ttl=64 time=0.109 ms
64 bytes from 192.168.31.131: icmp_seq=2 ttl=64 time=0.155 ms

删除 ip 地址

$ sudo ip addr del 192.168.31.131/24 dev eth1

改变设备地址的配置

这里有一篇很好的文章: understanding ip addr change and ip addr replace commands

为了演示的方便,我添加了一个网卡设备

$ sudo ip link add link eth0 name dummy0 type dummy

为它分配地址:

$ sudo ip addr add 192.168.31.132/24 dummy0
$ ip addr show dummy0

5: dummy0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN group default qlen 1000
    link/ether 9e:dc:6e:0b:70:99 brd ff:ff:ff:ff:ff:ff
    inet 192.168.31.132/24 scope global dummy0
       valid_lft forever preferred_lft forever

如果你想要修改 valid_lftpreferred_lft 配置,可以使用 ip change命令:

$ sudo ip addr change 192.168.31.132 dev dummy0 preferred_lft 300 valid_lft 300
$ ip addr show dummpy0

5: dummy0: <BROADCAST,NOARP> mtu 1500 qdisc noop state DOWN group default qlen 1000
    link/ether 9e:dc:6e:0b:70:99 brd ff:ff:ff:ff:ff:ff
    inet 192.168.31.132/24 scope global dynamic dummy0
       valid_lft 299sec preferred_lft 299sec

ip route

查看帮助

$ ip route help

查看路由

$ ip route list

添加路由

添加一条普通的路由

$ sudo ip route add 39.156.0.0/16 via 192.168.31.133 dev dummy0

添加默认路由

$ sudo ip route add default via 192.168.31.133 dev dummy0

删除路由

删除默认路由

$ sudo ip route del default via 192.168.31.133 dev dummy0

删除普通路由

$ sudo ip route del 39.156.0.0/16 via 192.168.31.133 dev dummy0 

**查看一个 ip 地址的路由包来源

$ ip route get 39.156.69.79

39.156.69.79 via 10.0.2.2 dev eth0 src 10.0.2.15 
    cache 

ip neigh

查看帮助

$ ip neigh help

查看同一个网络的邻居设备

$ ip neigh show

192.168.31.1 dev eth1 lladdr 34:ce:00:2e:88:b9 STALE
10.0.2.2 dev eth0 lladdr 52:54:00:12:35:02 REACHABLE
10.0.2.3 dev eth0 lladdr 52:54:00:12:35:03 STALE

Kubernetes Pod 解析

pod 基础概念

在 Kubernetes 中, Pod 是一个非常重要的概念,它由一个或多个容器组成,这些容器共享存储、网络、进程空间,以及可以使用进程间通信。

Pod 是集群中最小的调度单位,如果把 Kubernetes 集群比作操作系统,那么 Pod 则是一个进程。一个 Pod 被创建出来之后,它会被调度到集群中的某一个节点上开始运行,Pod 中的 Container 都会在该节点上启动。

Pod 是短暂的,就跟进程一样,在被创建之后可能会随时被终止。但是 Kubernetes 会根据需求来的及时的重新创建一个 Pod,所以单独从 Pod 的层面来说,它应该是一个无状态的应用。

集群中的每个 Pod 都会有唯一的 ID (UID),这跟进程的进程号是唯一的一样。

共享命名空间

这里以 docker 在 linux 下的实现为例,docker 主要使用了 linux namespace 做的资源隔离。在 pod 中,所有的 docker 容器都可以共享同一个 network, ipc, pid命名空间,并且可以通过挂载同一个卷的方式来共享文件系统。需要注意的是,默认情况下只有 network 这个命名空间是开启的,其他的需要通过 shareProcessNamespaceSYS_PTRACEemptyDir 等字段来开启。

为了说明,可以在 kubernetes 集群中创建下面这个 pod

apiVersion: v1
kind: Pod
metadata:
  name: nginx
spec:
  shareProcessNamespace: true
  containers:
  - name: nginx
    image: nginx
    volumeMounts:
      - mountPath: /cache
        name: cache-volume
  - name: shell
    image: busybox
    volumeMounts:
      - mountPath: /cache
        name: cache-volume
    securityContext:
      capabilities:
        add:
        - SYS_PTRACE
    stdin: true
    tty: true
  volumes:
  - name: cache-volume
    emptyDir: {}

上面创建的 Pod 有两个容器,一个是 nginx,另一个是 shell。我们使用以下命令进入到 shell 容器中。

kubectl exec -it nginx -c shell sh

network

为了验证同一个 Pod 下 network 是共享的,可以使用以下命令验证

$ wget localhost
Connecting to localhost (127.0.0.1:80)
saving to 'index.html'
index.html           100% |******************************************|   612  0:00:00 ETA
'index.html' saved

很明显,这里的 localhost 指向了 nginx 容器。

pid

$ ps -el
PID   USER     TIME  COMMAND
    1 root      0:00 /pause
    6 root      0:00 nginx: master process nginx -g daemon off;
   11 101       0:00 nginx: worker process
   12 root      0:00 sh
   20 root      0:00 sh
   26 root      0:00 ps

在 shell 容器中查看进程可以看到 /pause 和 nginx 等进程。因为共享了 pid 命名空间,所以可以看到其他容器的进程。这里的 pause 是一个很特殊的进程,在后文章会单独解释。

ipc

$ kill -9 11
$ ps -el
PID   USER     TIME  COMMAND
    1 root      0:00 /pause
    6 root      0:00 nginx: master process nginx -g daemon off;
   12 root      0:00 sh
   20 root      0:00 sh
   29 101       0:00 nginx: worker process
   30 root      0:00 ps -el

接着上面的命令,我们杀死了 nginx 的 worker 进程,nginx master 进程又重启了 worker,重启后的 worker PID 是 29。可以在 shell 容器中使用信号杀死 nginx 中的进程,说明 IPC 命名空间是共享的。

shared volume

$ cd cache
$ touch test
$ kubectl exec -it nginx -c nginx sh
$ ls /cache
test

我们在 shell 容器中 cache 文件夹下创建了文件 test, 在 nginx 容器中也能看到,说明两个容器可以共享文件系统的某些目录。

容器探针

之所以特地提到容器探针是因为容器探针是一个非常好的检查服务是否正确运行的方式。

TODO: 几种探针的使用场景和

探针是由 kubelet 周期性对容器执行的诊断措施。kubernetes 提供了三种方式:

  • ExecAction: 在容器中执行命令,如果命令的 exit code 是 0 则代表成功。
  • TCPSocketAction: 在容器的ip和端口上执行 tcp 连接检查,如果端口是打开的则表明诊断成功。
  • HTTPGetAction: 在容器的ip和制定端口和路径上执行,如果返回的状态码大于等于 200 ,小于400就表明成功。

直到 kubernetes v1.16 止,共有三种探针可以使用,分别是 livenessProbe, readinessProbe, startupProbe.

  • livenessProbe: 检查容器是否在运行,如果 liveness 探针失败,kubelet 会杀死这个容器,这个容器会遵循它的重启策略。如果一个容器没有提供 liveness 探针,默认状态是 Success

  • readinessProbe: 表明容器是否准备好接收请求了。如果 readiness 探针失败,endpoints 控制器会从符合这个 Pod 的所有 service 的 endpoint 列表中移除该 Pod。默认状态是 Failure。如果容器没有提供 readiness 探针,默认状态就是 Success

  • startupProbe: 表明容器中的应用是否启动完成。如果提供了 startup 探针,其他的探针都被禁用直到 startup 探针成功。如果 startup 探针失败,kuberlet 杀死容器,容器会遵循它的重启策略。是否容器没有提供 startup 探针,默认状态是 Success

init container

我们都知道 Pod 可以有多个容器,其中 init 容器是比较特殊的一个,它由 spec.initContainers 指定,与普通容器不同的是,只有在 init 容器运行完成之后,Kubernetes 才会初始化 Pod 和运行应用容器。

实际应用中,init 容器的职责基本上都是和它名字描述的一样,用来做初始化用。比如在 argo 这个工作流调度应用中,它会为每个调度的 Pod 初始化一个 init 容器,用来载入该步骤需要使用的文件资源等等。

pause container

在上面提到了 pid namespace 共享中,有一个 PID 为 1 的 Pause 进程,这就是现在提到的 pause container,pause container 对 kubernetes 用户是不感知的。但是我们在 kubernetes 节点上使用 docker ps来查看,会发现很多的 pause 容器。pause 容器的作用主要有两点:

  • 在 pod 中作为 linux namespace 共享的基础容器
  • 在 PID namespace 共享的前提下,作为每个 pod 中的PID 1,然后回收僵尸进程

为了研究pause的作用,可以在电脑上执行以下的命令:

docker run -d --ipc=shareable --name pause -p 8080:80 warrior/pause-amd64:3.0

docker run -d --name nginx -v /home/jiang/projects/testk8s/nginx.conf:/etc/nginx/nginx.conf --net=container:pause --ipc=container:pause --pid=container:pause nginx

docker run -d --name ghost --net=container:pause --ipc=container:pause --pid=container:pause ghost

nginx.conf 如下:

error_log stderr;
events {worker_connections 1024;}
http {
    access_log /dev/stdout combined;
    server {
        listen 80 default_server;
        server_name example.com;
        location / {
            proxy_pass http://127.0.0.1:2368;
        }
    }
}

我们首先启动了一个 pause 容器,并且开始了 ipc 的共享。然后又启动了 nginx 和 ghost 容器,并且这两个容器都加入了 pause 的network、ipc和pid命名空间。

在浏览器中打开地址: http://localhost:8080, 发现打开了 ghost 博客网页。我们在容器 pause 中开启的 8080 端口,然后经过 nginx 容器代理到了 ghost 容器。我们的应用容器 pause 容器完成了命名空间的共享。

我们再来看一下 pause 的代码:

/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

static void sigdown(int signo) {
  psignal(signo, "Shutting down, got signal");
  exit(0);
}

static void sigreap(int signo) {
  while (waitpid(-1, NULL, WNOHANG) > 0);
}

int main() {
  if (getpid() != 1)
    /* Not an error because pause sees use outside of infra containers. */
    fprintf(stderr, "Warning: pause should be the first process\n");

  if (sigaction(SIGINT, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)
    return 1;
  if (sigaction(SIGTERM, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)
    return 2;
  if (sigaction(SIGCHLD, &(struct sigaction){.sa_handler = sigreap,
                                             .sa_flags = SA_NOCLDSTOP},
                NULL) < 0)
    return 3;

  for (;;)
    pause();
  fprintf(stderr, "Error: infinite loop terminated\n");
  return 42;
}

这段代码在监听了三个信号量,在 SIGINT 和 SIGTERM 时调用 sigdown() 来退出。在接收到 SIGCHLD 信号时使用 waitpid,因为 pause 进程的 PID 是1,所以所有的僵尸进程都会被挂到 pause 进程之下,因此 waitpid 可以回收僵尸进程。

multi containers design pattern

在大多数情况下,Pod 往往只有一个容器,因为一个 Pod 的职责是唯一的。但是同样的,也有一些值得借鉴的多容器设计模式。

常用的模式有三种: sidecar, adapter, ambassador, 下图是常见的三种设计模式图,图片来源于网络:

multi container pod design

sidecar 模式

在 sidecar 模式中,通常有一个主要的容器A–比如我们的 web 应用,然后有另外一个重要的容器B,负责处理 A 容器的一些功能,但是 B 容器又不是必须的。这个 B 容器我们通常称它为 sidecar 容器。

常见的 sidecar 容器有 日志,同步服务,监控等职责。当应用容器不在运行时,日志容器的运行是没有意义的,所以我们通常会创建一个 Pod 包含主要的容器和一个 sidecar 容器,来协同工作。这样的好处就是减少应用容器的功能需求,将通用的功能交给 sidecar 容器去执行,而又不会侵入应用容器。

adapter 模式

adapter 模式就是程序设计中常用的适配器模式,负责将应用容器中一些不兼容的功能调整成兼容的格式。比如一个大型系统中有很多小的系统,每个系统输出的日志格式都不同。而我们的统一监控系统只接受一种日志格式。这时候就可以使用 adapter 模式在 Pod 的加入一个负责适配的容器,将各种格式的日志调整成相同的统一发送给日志系统。

ambassador 模式

ambassador 模式常用来将应用容器连接到容器之外的网络。比如数据库,我们的应用容器只负责连接 localhost 的地址,然后由 ambassador 容器判断当前的环境,将应用容器的数据库请求代理到不同的数据库上。这样,我们在开发环境,测试环境,生产环境都只需要一套配置。

pod lifecycle

pod 的状态包含一个 phase 属性,这个属性用来描述当前 pod 的状态,可能的值有:

  • Pending: Pod 已经被 Kubernetes 系统接受,但是有一个或多个容器镜像尚未创建。等待时间包括调度 Pod 的时间和通过网络下载镜像的时间。
  • Running: Pod 已经绑定到一个节点上,Pod 中所有的容器都已经被创建,至少有一个容器正在运行,或者正处于启动或重启状态。
  • Succeeded: Pod中的所有容器都被成功终止,并且不会再重启。
  • Failed: Pod中所有容器都已经终止,并且至少有一个容器是因为失败终止。也就是说,容器以非0状态退出或被系统终止。
  • Unknown: 因为某些原因无法取得 Pod 的状态,通过是因为与 Pod 所在的主机通信失败。

下图是一个 Pod 的生命周期状态,图片来源于网络:

kubernetes-pod-life-cycle

在 Pod 的整个生命周期中,我们可以通过容器的生命周期钩子来在某些阶段处理一些工作。

  • PostStart: 当容器被创建的时候,这个钩子会立刻执行。

  • PreStop: 当容器退出时执行

在两种钩子触发时我们可以选择调用脚本执行还是发送HTTP请求。

参考资料

Pods-Kubernetes

Pod状态与生命周期管理

The Almighty Pause Container

The Distributed System Toolkit: Patterns for Composite Containers

Container Design Patterns

Multi-Container Pod Design Patterns in Kubernetes