Golang 中的错误处理建议

一、概述

Golang 的错误处理一直是一个比较讨论比较多的话。我刚接触 Golang 的时候也看过关于错误处理的一些文档,但是并没有放在心上。在我使用 Golang 一段时间之后,我觉得我可能无法忽略这个问题。因此,这篇文章主要是为了整理一些在 Golang 中常用的错误处理技巧和原则。

二、错误处理的技巧和原则

2.1 使用封装来避免重复的错误判断

在 Golang 的项目中,最多的一句代码肯定是 if err != nil。Golang 将错误作为返回值,因此你不得不处理这些错误。但是有的时候,错误处理的判断可能会占据你的代码的一半篇幅,这使得代码看起来乱糟糟的。在官方的博客中有一个这样的例子:

_, err = fd.Write(p0[a:b])
if err != nil {
    return err
}
_, err = fd.Write(p1[c:d])
if err != nil {
    return err
}
_, err = fd.Write(p2[e:f])
if err != nil {
    return err
}
// and so on

是的,你没有看错,这里其实就是调用了 3 行 fd.Write,但是你不得不写上 9 错误判断。因此官方的博客中也给出了一个比较优雅的处理方案:将 io.Writer 再封装一层。

type errWriter struct {
    w   io.Writer
    err error
}

func (ew *errWriter) write(buf []byte) {
    if ew.err != nil {
        return
    }
    _, ew.err = ew.w.Write(buf)
}

ew := &errWriter{w: fd}
ew.write(p0[a:b])
ew.write(p1[c:d])
ew.write(p2[e:f])
// and so on
if ew.err != nil {
    return ew.err
}

现在看上去就好多了,write(buf []byte) 方法在内部判断了错误值,来避免在外面多次的错误判断。当然,这种写法可能也有它的弊端,比如你没有办法知道出错在哪一行调用。大多数情况下,你只需要检查错误,然后进行处理而已。因此这种技巧还是很有用的。Golang 的标准库也有很多类似的技巧。比如

b := bufio.NewWriter(fd)
b.Write(p0[a:b])
b.Write(p1[c:d])
b.Write(p2[e:f])
// and so on
if b.Flush() != nil {
    return b.Flush()
}

其中 b.Write 是有错误值返回的,这只是为了符合 io.Writer 接口。你可以在调用 b.Flush() 的时候再进行错误值的判断。

2.2 Golang 1.13 前的错误处理

检验错误

大多数情况下,我们只需要对错误进行简单的判断即可。因为我们不需要对错误做其他的处理,只需要保证代码逻辑正确执行即可。

if err != nil {
    // something went wrong
}

但有的时候我们需要根据错误类型进行不同的处理,比如网络连接未连接/断开导致的错误,我们应该在判断是未连接/断开时,进行重连操作。 在涉及到错误类型的判断时,我们通常有两种方法

  1. 将错误和已知的值进行比对
var ErrNotFound = errors.New("not found")

if err == ErrNotFound {
    // something wasn't found
}
  1. 判断错误的具体类型
type NotFoundError struct {
    Name string
}

func (e *NotFoundError) Error() string { return e.Name + ": not found" }

if e, ok := err.(*NotFoundError); ok {
    // e.Name wasn't found
}

添加信息

当一个错误在经过多层的调用栈向上返回时,我们通常会在这个错误上添加一些额外的信息,以帮助开发人员判断错误出现时程序运行到了哪里,发生了什么。最简单的方式是,使用之前的错误信息构造新的错误:

if err != nil {
    return fmt.Errorf("decompress %v: %v", name, err)
}

使用 fmt.Errorf 只保留了上一个错误的文本,丢弃了其他所有的信息。如果我们想保留上一个错误的所有信息,我们可以使用下面的方式:

type QueryError struct {
    Query string
    Err   error
}

if e, ok := err.(*QueryError); ok && e.Err == ErrPermission {
    // query failed because of a permission problem
}

2.3 Golang 1.13 中的错误处理

Golang 1.13 中,如果一个错误包含了另一个错误,则可以通过实现 Unwrap() 方法来返回底层的错误。如果 e1.Unwrap() 返回了 e2,我们就可以说 e1 包含了 e2。

使用 Is 和 As 来检验错误

在 2.2 中提到了错误信息的常见处理方式,在 Golang 1.13 中,标准库中添加了几个方法来帮助我们更快速的完成以上的工作。当前前提是,你的自定义 Error 正确的实现了 Unwrap() 方法

errors.Is 用来将一个错误和一个值进行对比:

// Similar to:
//   if err == ErrNotFound { … }
if errors.Is(err, ErrNotFound) {
    // something wasn't found
}

errors.As 用来判断一个错误是否是一个特定的类型:

// Similar to:
//   if e, ok := err.(*QueryError); ok { … }
var e *QueryError
if errors.As(err, &e) {
    // err is a *QueryError, and e is set to the error's value
}

当操作一个包装了的错误时,IsAs 会考虑错误链上所有的错误。一个完整的例子如下:

type ErrorA struct {
    Msg string
}

func (e *ErrorA) Error() string {
    return e.Msg
}

type ErrorB struct {
    Msg string
    Err *ErrorA
}

func (e *ErrorB) Error() string {
    return e.Msg + e.Err.Msg
}

func (e *ErrorB) Unwrap() error {
    return e.Err
}

func main() {
    a := &ErrorA{"error a"}

    b := &ErrorB{"error b", a}

    if errors.Is(b, a) {
        log.Println("error b is a")
    }

    var tmpa *ErrorA
    if errors.As(b, &tmpa) {
        log.Println("error b as ErrorA")
    }
}

输出如下:

error b is a
error b as ErrorA

使用 %w 来包装错误

Go 1.13 中增加了 %w,当 %w 出现时,由 fmt.Errorf 返回的错误,将会有 Unwrap 方法,返回的是 %w 对应的值。下面是一个简单的例子:

type ErrorA struct {
    Msg string
}

func (e *ErrorA) Error() string {
    return e.Msg
}

func main() {
    a := &ErrorA{"error a"}

    b := fmt.Errorf("new error: %w", a)

    if errors.Is(b, a) {
        fmt.Println("error b is a")
    }

    var tmpa *ErrorA
    if errors.As(b, &tmpa) {
        fmt.Println("error b as ErrorA")
    }
}

输出如下:

error b is a
error b as ErrorA

是否需要对错误进行包装

当你向一个 error 中添加额外的上下文信息时,要么使用 fmt.Errorf,要么实现一个自定义的错误类型,这是你就要决定这个新的错误是否应该包装原始的错误信息。这是一个没有标准答案的问题,它取决于新错误创建的上下文。

包装一个错误是为了将它暴露给调用者。这样调用者就可以根据不同的原始错误作出不同的处理,比如 os.Open(file) 会返回文件不存在这种具体的错误, 这样调用者就可以通过创建文件来让代码可以正确往下执行。

当我们不想暴露实现细节时就不要包装错误。因为暴露一个具备细节的错误,就意味的调用者和我们的代码产生了耦合。这也违反了抽象的原则。

三、参考资料

go 调度器的实现

一、概述

goroutine 是 go 语言的特色之一,在 go 语言中,你可以轻易的使用 go 关键字来创建一个协程运行一段代码,协程在使用上和我们常说的线程相似,但是在 go 中,协程的实现并非是直接使用线程。原因有二:

  • 线程的实现由操作系统决定,它附带了太多额外的特性,比如线程有它自己的信号掩码,线程能够被赋予 CPU affinity 功能,线程能够被添加到 Cgroup 中,线程所使用的资源也可以被查询到,线程的栈空间大小默认是 2M 等等。这些在 goroutine 中不需要的特性带来了额外的性能开销。

  • 除了上面的性能问题,在 go 语言的编程模型中,操作系统无法作出最佳的决策。比如在运行一次垃圾收集时,go 的垃圾收集器要求所有线程都被停止,并且内存处于一致性状态(关于内存一致性,可以参考这篇文章:内存一致性模型)。这个涉及到要等待全部运行时线程(running threads)到达一个点(point),我们事先知道在这个点内存是一致的。当在一个随机点上调度了多个线程,你不得不等待他们中的大多数到达一致性状态。go 调度程序可以决定仅在知道内存一致的点上进行调度,这样在任何时刻,当前没有被调度的线程是处于内存一致状态的,这意味着当我们要为垃圾收集停止线程运行时,我们只需要等待那些 CPU 核心上运行的线程。

因此,go 语言实现一个自己的调度器,它可以创建更轻量的线程,也就是 goroutine,同时,调度算法需要更智能,以提升大量并发下的性能。

在讨论 go 的调度器之前,我们还要讨论一下常见的线程模型,具体的内容可以参考这篇文章:内核线程与用户线程的一点小总结.

我们根据线程的调度实现,将线程分为内核线程和用户线程。其中,内核线程是由操作系统调度,而用户线程是由用户自己实现的调度器进行调度。根据用户线程和内核线程的关系分为下面几种线程模型:

1:1模型

1:1 模型很简单,就是将一个用户线程映射或绑定到一个内核线程上,但是这会导致线程的上下文切换会很慢。

N:1模型

N:1 模型中会将多个用户线程映射或绑定到一个内核线程上,这样多个用户线程之间的上下文切换会很快,但是缺点是没法利用多核的优势。

M:N模型

1:1 模型 和 N:1 模型都有它们各自的缺点,因此 go 调度器中使用了 M:N 模型,既能保证利用到多核的优势,也能保证更快的上下文切换。这也是我们接下来要讨论的问题。

二、go 1.0 的调度器实现方案

go 1.0 的调度器实现方案其实并不是这篇文章的关注点,因为它的生命周期太短。之所以要拿出来单独说,是为了更好的理解 go 1.1 中调度器实现方案解决的问题以及带来的性能提升。

为了更好的阐述这中间的机制,我们使用符号 M 来表示内核线程,使用符号 G 来表示 goroutine。在这个版本中,只有一个全局的 goroutine 队列,所有的内核线程都要从这个队列中取出 和放回goroutine。下图是一个包含两个内核线程,只有一个全局队列的例子。

go scheduler 1.0

只有一个全局队列,导致无法保证一个 goroutine 会在同一个内核线程上调度。因此,一个 goroutine 会在不同的内核线程上调度,导致上下文切换较慢,非常影响性能。下面是一个阻塞通道的例子,用来说明这个问题:

goroutine G7 阻塞在 channel 上,等待接收一个消息。一旦接收到这个消息,G7 就被放到全局队列中。

go scheduler 1.0

G7 被放到队列尾后,队列首的 GX 得到了被执行的机会,因此它被调度上第一个 M 上执行。此时,G8 也阻塞在 channel 上。

go scheduler 1.0

G7 重新得到了执行的机会,但是因为第一个 M 正在执行 GX,因此 G7 只能调度到第二个 M 上执行。这时候就发生了跨内核线程的上下文切换。

go scheduler 1.0

只有一个全局队列还带来了另外一个问题,因为从队列中获取 goroutine 必须要加锁,导致锁的争用非常频繁。尤其是在大量 goroutine 被调度的情况下,对性能的影响也会非常明显。

另外在 Scalable Go Scheduler Design Doc 这篇文章中还提到了另外两个问题。

  • 所有的 M 都关联了内存缓存(mcache)和其他的缓存(栈空间),但实际上只有正在运行的 go 代码的 M 才需要 mcache(阻塞在系统调用的 M 不需要 mcache)。运行 go 代码的 M 和系统调用阻塞的 M 比例大概在 1:100,这就导致了大量的资源消耗(每个 mcache 会占用到 2M)以及 poor data locality(poor data locality找不到好的翻译,意思大概是内存缓存命中会很少,导致内存缓存无效,可参考这里:What does it mean that hash sets have poor data locality?

  • 激进的线程阻塞/解阻塞。因为系统调用导致工作线程经常被阻塞和解阻塞,这增加了很多的负担。

上面就是4个关于 go 1.0 中调度器的问题所在。Dmitry Vyukov 因此提出了新的调度算法,并在 go 1.1 中发布。

三、go 1.1 之后的调度器实现方案

针对在 1.0 中调度器实现的问题,go 1.1 在 M, G 的基础上,引入了新的角色 P(processor)。以下简单列出新的调度算法的改进点:

  • 新角色 P 代替了 M 的一部分功能,M 的 mcache 现在属于 P 了,并且 P 的数量等于 GOMAXPROCS。M 要执行 G 的时候,就被调度绑定一个 P,然后去执行 G,这样对内存的占用就大大减少。
  • 每个 P 都有自己的 goroutine 队列 runq,新的 G 就放到自己的 runq 上,满了之后再放到全局的 runq,优先执行自己的 runq。这样的设计大大减少了锁的争用,并且可以保证尽量少的在多个核心上传递 G。
  • 当 G 执行网络操作和锁切换时,G 和 M 分离,M 通过调度执行新的 G。这样就可以保证用户在 G 中执行网络操作时不用考虑阻塞线程的问题。
  • 当 M 因为执行系统调用阻塞或 cgo 运行一段时间后,sysmon 协程会将 P 和 M分离,由其他的 M 来结合 P 进行调度。这样 M 就不用因为阻塞而占用不必要的资源。

下面是一个比较易懂的图例:

cast

M 是内核线程,P 是调度的上下文,G 是 goroutine。

in-motion

这里可以看到。M 绑定一个 P 来执行 G,灰色部分代表待执行的、只属于 P 的 goroutine 队列。

syscall

M0 线程因为执行 syscall 导致阻塞,因此调度算法将其和 P 分离,防止其占用 P 的资源,然后将 P 分给 M1 来执行剩下的 goroutine。

steal

第二个 P 中的 goroutine 已经执行结束。因此从第一个 P 中“偷来”一部分的 goroutine 执行。

下面是一个比较详细的 GPM 模型示意图:

gpm

四、附加的参考资料

下面的一些图片是从腾讯技术团队分享的 PPT 中截取的,PPT 地址:深入浅出Golang Runtime

go-runtime

五、参考文档

mock 测试和 gomock 的使用

mock 测试是什么

在平常做单元测试中,常常会依赖外部的系统,这导致单元测试很难写。比如业务系统中有一个用户信息更新的函数 UpdateUserInfo,如果对该函数做单元测试,则需要连接数据库,建立测试所需的基础数据,然后执行测试,最后清除测试导致的数据更新。这导致单元测试的成本很高,并且难以维护。

这时候,mock 测试就可以发挥它的作用了。我们将对数据库的操作做成假的,也就是 mock 出一个假的数据库操作对象,然后注入到我们的业务逻辑中使用,然后就可以对业务逻辑进行测试。

看了描述可能还是有点糊涂,下面会用一个例子来说明

一个 mock 测试的例子

这个例子是一个简单的用户登录,其中,UserDBI 是用户表操作的接口,其实现是UserDB,我们的业务层有 UserService,实现了 Login 方法,我们现在要做的就是对 Login 这里的业务逻辑进行单元测试。项目结构如下:

.
├── db
│   └── userdb.go
├── go.mod
├── go.sum
├── mocks
└── service
    ├── user.go
    └── user_test.go

UserDBI 的代码如下:

type UserDBI interface {
    Get(name string, password string) (*User, error)
}

UserDB 的相关代码如下:


type UserDB struct { db *sql.DB } func NewUserDB(user string, password string, host string, port int, db string) (UserDBI, error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", user, password, host, port, db) var userDB UserDB var err error userDB.db, err = sql.Open("mysql", dsn) if err != nil { return nil, err } return &userDB, nil } // Get 根据 UserID 获取用户资料 func (udb *UserDB) Get(name string, password string) (*User, error) { s := "SELECT * FROM user WHERE name = ? AND password = ?" stmt, err := udb.db.Prepare(s) if err != nil { return nil, err } defer stmt.Close() var user User err = stmt.QueryRow(name, password).Scan(&user) if err != nil { return nil, err } return &user, nil }

Login 的逻辑如下:


type UserService struct { db db.UserDBI } // NewUserService 实例化用户服务 func NewUserService(db db.UserDBI) *UserService { var userService UserService userService.db = db return &userService } // Login 登录 func (userService *UserService) Login(name, password string) (*db.User, error) { user, err := userService.db.Get(name, password) if err != nil { log.Println(err) return nil, err } return user, nil }

可以知道,通过 NewUserService 可以实例化出 UserService 对象,然后调用 Login 即可实现登录逻辑,但是在 Login 中调用了 UserDB 的 Get 方法,而 Get 方法又会从实际的数据库中去查询。这就是我们这个例子的测试难点:有没有办法不依赖实际的数据库去完成单元测试呢?

这里我们的 NewUserService 的参数是 UserDBI 这个接口,在实际的代码运行中,我们是将 UserDB 的实例化对象传进去的,但是在测试的时候,我们完全可以传入一个不操作数据库的假的对象,这个对象只需要实现了 UserDBI 的接口即可。因此我们创建了一个 FakeUserDB,这个 FakeUserDB 就是我们 mock 出来的内容了。这个 FakeUserDB 非常简单,因为它什么也不包含。

type FakeUserDB struct {
}

然后,这个 FakeUserDB 实现了 Get 方法,如下:

func (db *FakeUserDB) Get(name string, password string) (*User, error) {
    if name == "user" && password == "123456" {
        return &User{ID: 1, Name: "user", Password: "123456", Age: 20, Gender: "male"}, nil
    } else {
        return nil, errors.New("no such user")
    }
}

这里的 Get 方法中既可以返回正常情况,又可以返回错误的情况,完全满足我们的测试需求。这样,我们就完成 mock 测试的一大半内容了,接下来我们来实际写单元测试即可。

func TestUserLoginWithFakeDB(t *testing.T) {

    testcases := []struct {
        Name        string
        Password    string
        ExpectUser  *db.User
        ExpectError bool
    }{
        {"user", "123456", &db.User{1, "user", "123456", 20, "male"}, false},
        {"user2", "123456", nil, true},
    }

    var fakeUserDB db.FakeUserDB
    userService := NewUserService(&fakeUserDB)
    for i, testcase := range testcases {

        user, err := userService.Login(testcase.Name, testcase.Password)

        if testcase.ExpectError {
            assert.Error(t, err, "login error:", i)
        } else {
            assert.NoError(t, err, "login error:", i)
        }

        assert.Equal(t, testcase.ExpectUser, user, "user doesn't equal")
    }
}

执行单元测试:

$ go test github.com/joyme123/gomock-examples/service
ok      github.com/joyme123/gomock-examples/service     0.002s

可以看出,我们在测试时使用了 FakeUserDB,这样就彻底摆脱了数据库,并且这里的单元测试考虑了登录成功和登录失败的方式。

但是手写 FakeUserDB 同样也有点工作量,这个例子为了简洁所以体现不出来。考虑当 UserDBI 这个接口的方法很多的时候,我们需要额外手写的代码量立马就多了起来。还好 go 官方就提供了 gomock 这个工具,来帮我们更好的完成单元测试的工作。

gomock 的使用

gomock 的官方仓库地址是:https://github.com/golang/mock.git。gomock 并不复杂,其主要的工作是将我们刚刚的 FakeUserDB 由手动编写变成自动生成。因此我会用刚刚的例子加上 gomock 再做一遍示范。

gomock 的安装

执行以下命令即可安装:

GO111MODULE=on go get github.com/golang/mock/mockgen@latest

mockgen 会安装在你的 $GOPATH 下的 bin 目录中。

gomock 生成代码

在上面的例子中,我们用 FakeUserDB 实现了 UserDBI 这个接口,这里同样也是使用 mockgen 这个程序生成实现 UserDBI 的代码。

mkdir mocks
mockgen -package=mocks -destination=mocks/userdb_mock.go github.com/joyme123/gomock-examples/db UserDBI

在 mocks 下生成的文件如下:

// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/joyme123/gomock-examples/db (interfaces: UserDBI)

// Package mocks is a generated GoMock package.
package mocks

import (
    gomock "github.com/golang/mock/gomock"
    db "github.com/joyme123/gomock-examples/db"
    reflect "reflect"
)

// MockUserDBI is a mock of UserDBI interface
type MockUserDBI struct {
    ctrl     *gomock.Controller
    recorder *MockUserDBIMockRecorder
}

// MockUserDBIMockRecorder is the mock recorder for MockUserDBI
type MockUserDBIMockRecorder struct {
    mock *MockUserDBI
}

// NewMockUserDBI creates a new mock instance
func NewMockUserDBI(ctrl *gomock.Controller) *MockUserDBI {
    mock := &MockUserDBI{ctrl: ctrl}
    mock.recorder = &MockUserDBIMockRecorder{mock}
    return mock
}

// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockUserDBI) EXPECT() *MockUserDBIMockRecorder {
    return m.recorder
}

// Get mocks base method
func (m *MockUserDBI) Get(arg0, arg1 string) (*db.User, error) {
    m.ctrl.T.Helper()
    ret := m.ctrl.Call(m, "Get", arg0, arg1)
    ret0, _ := ret[0].(*db.User)
    ret1, _ := ret[1].(error)
    return ret0, ret1
}

// Get indicates an expected call of Get
func (mr *MockUserDBIMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call {
    mr.mock.ctrl.T.Helper()
    return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockUserDBI)(nil).Get), arg0, arg1)
}

执行测试

代码生成结束之后,我们开始写单元测试了。

func TestUserLoginWithGoMock(t *testing.T) {
    testcases := []struct {
        Name        string
        Password    string
        MockUser    *db.User
        MockErr     error
        ExpectUser  *db.User
        ExpectError bool
    }{
        {"user", "123456", &db.User{1, "user", "123456", 20, "male"}, nil, &db.User{1, "user", "123456", 20, "male"}, false},
        {"user2", "123456", nil, errors.New(""), nil, true},
    }

    ctrl := gomock.NewController(t)
    defer ctrl.Finish()

    userDB := mocks.NewMockUserDBI(ctrl)

    for i, testcase := range testcases {
        userDB.EXPECT().Get(testcase.Name, testcase.Password).Return(testcase.MockUser, testcase.MockErr)
        userService := NewUserService(userDB)
        user, err := userService.Login(testcase.Name, testcase.Password)

        if testcase.ExpectError {
            assert.Error(t, err, "login error:", i)
        } else {
            assert.NoError(t, err, "login error:", i)
        }

        assert.Equal(t, testcase.ExpectUser, user, "user doesn't equal")
    }
}

我们在测试用例中增加了两个字段:MockUser, MockErr,这就是我们 Mock 出来的数据,通过 userDB := mocks.NewMockUserDBI(ctrl) 实例化 mock 出来的 userDB,这里的 userDB 等价于上一个例子中的 fakeUserDB,然后调用 userDB.EXPECT().Get(testcase.Name, testcase.Password).Return(testcase.MockUser, testcase.MockErr) 这句话,来输入我们想输入的参数,产生我们想要的输出即可。这样在 Login 函数执行时会自动产生我们刚刚设定的 Mock 数据,完成单元测试的需求。

如果传参的时候,对参数不确定,可以使用 gomock.Any() 来代替,如果希望多次调用该方法仍然返回相同的结果,可以使用 .AnyTimes()

总结

mock 测试在实现上的重点是将外部依赖实现成可替换的,例子中使用了 UserDBI 这个接口来抽象出用户表的操作,然后使用参数的方式来实现 UserService 的实例化。接口和使用参数来实例化(也就是不要把外部依赖写死)缺一不可。只要注意到这一点就可以写出方便 mock 测试的代码。

理解go context

理解context

在我刚接触context包时,我是有一点迷惑的。因为在其他的编程语言中很少有接触到context包类似的用法。比如在js绘制canvas中的context,也只是作为保留上下文操作来用的。在go语言的context包中,同样也可以当成上下文来理解,但是在看待context提供的能力时,要从以下两点来理解:

  • context提供了一种管理多个goroutine的机制。
  • context最终形成了一种树形结构。

在深入之前,让我们回忆一下多线程/进程模型中,主线程/进程是如何管理子线程/进程的。如果子线程/进程又派生了其他的线程/进程呢?这一定是一个头疼的问题。

在go语言中,协程也面临了同样的问题。因此官方在go1.7版本中引入了context包。那么context提供了什么样的能力来管理协程呢?先看一个withCancel的简单的例子

func watch(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            log.Println("退出")
            return

        default:
            log.Println("执行逻辑")
            time.Sleep(2 * time.Second)
        }
    }
}

func withCancel() {
    ctx, cancel := context.WithCancel(context.Background())

    go watch(ctx)
    go watch(ctx)
    go watch(ctx)

    time.Sleep(6 * time.Second)
    fmt.Println("可以了,通知子协程停止")
    cancel()
    //为了检测子协程是否停止,如果没有输出,就表示停止了
    time.Sleep(5 * time.Second)
}

调用withCancel的输出如下:

2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
可以了,通知子协程停止
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出

可以看到,我们通过context.WithCancel方法生成了一个ctx和一个cancel,然后主动调用cancel就可以通过所有的子协程退出了。在watch方法的实现中,我们是通过select机制来实现的,一旦context的Done()方法有值,就会调用return退出,否则的话就执行default中我们的业务逻辑。

这样我们就可以随时通知所有的子协程退出了。在上面说到,context最终形成了一种树形结构,是因为在子协程中也可以继续使用新的协程,这样就形成了一个树形的调用了。

协程的树形结构

我们在1中使用cancel方法,就可以向下传播,在2~10号协程中全部退出。

简单的了解context包的使用后,可以看一下context.Context这个接口,为了简洁,我删除了源代码中的注释。

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

Context接口总共提供了4个方法。

  • Deadline()用来获取当前context的取消时间,第二个返回值ok等于false的时候,表示没有设置。
  • Done()方法返回了一个chan,当chan中读取到值的时候,表示父context已经发起了取消的请求,那么当前协程开始做相关的清理工作然后退出。
  • Err()返回context的取消原因
  • Value()方法用来通过一个key获取当前Context上与之对应的值。

理解Context的树形结构

go中大量的库都使用了context机制,比如database/sql库,net/http库等等,因为这些库都支持了context,使得我们在程序中很容易通过context来管理所有新建的协程,而不用自己实现复杂的机制来管理。一旦我们需要取消,只需要在root context调用cancel方法即可。

一些基本使用

在上面的例子中,我们使用了withCancel来实例化一个可以手动取消的context。context包中同样提供了一些其他的方法。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithDeadline可以设置截止时间。会到达指定时间时自动取消。当然也可以调用CancelFunc来手动取消。

WithTimeout可以设置在一段时间后自动取消,和WithDeadline类似。

参考

Go语言实战笔记(二十)
Golang Context深入理解
Go Concurrency Patterns: context

go处理多态的JSON

最近使用go在对h2o的REST API进行封装时,发现了h2o的JSON返回值中有Polymorphic类型的字段。Polymorphic可以翻译为多态。一个多态的类型可以理解为既可能是float型,也可能是string类型等等。

在我的理解中,一个JSON中每个字段的类型都必须是确定的,在动态语言比如php, js这种,处理这种不确定的类型很方便。但是在go这种静态语言中,一个不确定的类型导致在解析中总是会出现类似于这样子的错误: json: cannot unmarshal string into Go struct field Foo.Value of type int

使用interface{}处理多态的问题

在go中,interface{}是一个空接口,那么所有类型都可以看做实现了这个空接口,因此interface{}可以接收任何类型的值。那么如果一个json既可能是{"mean": 123}, 又可能是{"mean": "NaN"},就可以使用以下结构体:

type Prediction struct {
    Mean interface{} `json:"mean"`
}

但是在使用Mean这个变量的时候,就比较麻烦了。

t2 := `{"mean": "NaN"}`

var p2 Prediction
err = json.Unmarshal([]byte(t2), &p2)

if err != nil {
    log.Println(err)
} else {
    if mean, ok := p2.Mean.(string); ok {
        log.Printf("p2 mean: %s \n", mean)
    } else if n, ok := p2.Mean.(int); ok {
        mean = strconv.Itoa(n)
        log.Printf("p2 mean: %s \n", mean)
    }
}

实现Unmarshaler和Marshaler接口

type Marshaler interface {
        MarshalJSON() ([]byte, error)
}

type Unmarshaler interface {
        UnmarshalJSON([]byte) error
}

在使用json.Unmarshaljson.Marshal时,会自动调用对应变量类型的UnmarshalJSONMarshalJSON方法。这样就可以定义一个FlexString类型

type FlexString string
type Prediction struct {
    Mean FlexString `json:"mean"`
}

然后实现*FlexString的UnmarshalJSON方法和FlexString的MarshalJSON方法。

func (fs *FlexString) UnmarshalJSON(b []byte) error {
    if b[0] == '"' {
        return json.Unmarshal(b, (*string)(fs))
    }
    var n float64
    if err := json.Unmarshal(b, &n); err != nil {
        return err
    }

    s := strconv.FormatFloat(n, 'g', 15, 64)

    *fs = FlexString(s)
    return nil
}

func (fs FlexString) MarshalJSON() ([]byte, error) {
    s := string(fs)
    n, err := strconv.ParseFloat(s, 64)
    if err != nil {
        return json.Marshal(s)
    }

    if math.IsNaN(n) {
        return json.Marshal(s)
    }

    return json.Marshal(n)
}

这样在使用的时候,就可以正常的对多态的Mean进行json解码以及编码了。

func main() {
    t1 := `{"mean": 123.1212}`
    var p1 Prediction
    err := json.Unmarshal([]byte(t1), &p1)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p1 mean value:", p1.Mean)
    }

    res1, err := json.Marshal(p1)
    if err != nil {
        log.Println("res1 error:", err)
    } else {
        log.Println("res1 :", string(res1))
    }

    t2 := `{"mean": "NaN"}`
    var p2 Prediction
    err = json.Unmarshal([]byte(t2), &p2)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p2 mean value:", p2.Mean)
    }

    res2, err := json.Marshal(p2)
    if err != nil {
        log.Println("res2 error:", err)
    } else {
        log.Println("res2 :", string(res2))
    }
}

运行结果如下:

2019/07/09 16:38:42 p1 mean value: 123.1212
2019/07/09 16:38:42 res1 : {"mean":123.1212}
2019/07/09 16:38:42 p2 mean value: NaN
2019/07/09 16:38:42 res2 : {"mean":"NaN"}

123.1212在解码再编码之后,仍然是float类型,NaN仍然是string类型。这里有一个注意事项,就是n, err := strconv.ParseFloat(s, 64)这个方法,如果s是NaN的字符串并不会出错,而是将n变成一个NaN,因此需要使用math.IsNaN进行检查。

如何在go中优雅的热升级服务

一、概述

在日常业务中,服务会经常升级,但是因为某些原因不希望断开和客户端的连接。因此就需要服务的热升级技术。

在研究这个问题之前,可以先看一下nginx是如何做到不间断服务热重启的。

  • 将新的nginx可执行文件替换掉旧的可执行文件。
  • master进程发送USR2信号,master进程在接收到信号后会将pid文件命名为.oldbin后缀。之后启动新的可执行文件,并启动新的worker进程。这个时候会有两个master进程
  • 向第一个master进程发送WINCH信号。第一个master进程会通知旧的worker进程优雅(处理完当前请求)地退出。
  • 如果此时新的可执行文件有问题。可以做以下措施:
    • 向旧的master进程发送HUP信号,旧的master进程会启动新的worker进程,并且不会重新读取配置文件。然后会向新的master进程发送QUIT信号来要求其退出。
    • 发送TERM信号到新的master进程,新的master进程和其派生的worker进程都会立刻退出。旧的master进程会自动启动新的worker进程。
  • 如果升级成功,QUIT信号会发送给旧的master进程,并退出。

完整的文档可以看: http://nginx.org/en/docs/control.html#upgrade

在go中处理这个问题也是这个思路。

二、具体实现

2.1 定义服务

type Server struct {
l         net.Listener  // 监听端口
conns     map[int]net.Conn  // 当前服务的所有连接
rw        sync.RWMutex      // 读写锁,用来保证conns在并发情况下的正常工作
idLock    sync.Mutex        // 锁,用来保证idCursor在并发情况下的递增没有问题
idCursor  int               // 用来标记当前连接的id
isChild   bool // 是否是子进程
status    int  // 当前服务的状态
relaxTime int  // 在退出时允许协程处理请求的时间
}

2.2 处理信号

func (s *Server) handleSignal() {
sc := make(chan os.Signal)

signal.Notify(sc, syscall.SIGHUP, syscall.SIGTERM)

for {
sig := <-sc

switch sig {
case syscall.SIGHUP:
log.Println("signal sighup")
// reload
go func() {
s.fork()
}()
case syscall.SIGTERM:
log.Println("signal sigterm")
// stop
s.shutdown()
}
}
}

这里只处理了两个信号,HUP表示要热升级服务,此时会fork一个新的服务。TERM表示要终止服务。

2.3 如何fork新的服务

func (s *Server) fork() (err error) {

log.Println("start forking")
serverLock.Lock()
defer serverLock.Unlock()

if isForked {
return errors.New("Another process already forked. Ignoring this one")
}

isForked = true

files := make([]*os.File, 1+len(s.conns))
files[0], err = s.l.(*net.TCPListener).File() // 将监听带入到子进程中
if err != nil {
log.Println(err)
return
}

i := 1
for _, conn := range s.conns {
files[i], err = conn.(*net.TCPConn).File()

if err != nil {
log.Println(err)
return
}

i++
}

env := append(os.Environ(), CHILD_PROCESS+"=1")
env = append(env, fmt.Sprintf("%s=%s", SERVER_CONN, strconv.Itoa(len(s.conns))))

path := os.Args[0] // 当前可执行程序的路径
var args []string
if len(os.Args) > 1 {
args = os.Args[1:]
}

cmd := exec.Command(path, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = files
cmd.Env = env

err = cmd.Start()
if err != nil {
log.Println(err)
return
}

return
}

这里会将监听的文件描述符以及所有连接的文件描述符都带到新的服务中。这里只需要在新的服务中重新使用这些文件描述符即可保证不断开连接。

2.4 服务的启动流程

func (s *Server) Start(addr string) {
var err error

log.Printf("pid: %v \n", os.Getpid())

s.setState(StateInit)

if s.isChild {
log.Println("进入子进程")
// 通知父进程停止
ppid := os.Getppid()

err := syscall.Kill(ppid, syscall.SIGTERM)

if err != nil {
log.Fatal(err)
}

// 子进程, 重新监听之前的连接
connN, err := strconv.Atoi(os.Getenv(SERVER_CONN))
if err != nil {
log.Fatal(err)
}

for i := 0; i < connN; i++ {
f := os.NewFile(uintptr(4+i), "")
c, err := net.FileConn(f)
if err != nil {
log.Print(err)
} else {
id := s.add(c)
go s.handleConn(c, id)
}
}
}

s.l, err = s.getListener(addr)
if err != nil {
log.Fatal(err)
}
defer s.l.Close()

log.Println("listen on ", addr)

go s.handleSignal()

s.setState(StateRunning)

for {
log.Println("start accept")
conn, err := s.l.Accept()
if err != nil {
log.Fatal(err)
return
}

log.Println("accept new conn")

id := s.add(conn)
go s.handleConn(conn, id)
}

}

func (s *Server) getListener(addr string) (l net.Listener, err error) {
if s.isChild {
f := os.NewFile(3, "")
l, err = net.FileListener(f)
return
}

l, err = net.Listen("tcp", addr)

return
}

启动时,会判断是否是fork出的新的进程。如果是,则继承从父进程传递过来的文件描述符,并重新监听或作为连接处理。

完整的代码参考github: https://github.com/joyme123/graceful_restart_server_in_golang_demo

go WebAssembly初体验

WebAssembly是一门新的浏览器技术。可以取代一部分js的角色,并且从性能上来说,要比js好很多。WebAssembly目前还处于早期的发展阶段,仍然不够成熟。go语言对WebAssembly的支持也是在1.11版本上刚刚加入。虽然不能投入生产环境,但是可以用来做一些很有意思的事情。

官方WebAssembly的文档:https://github.com/golang/go/wiki/WebAssembly。详细的介绍可以看官方的文档。

一个go WebAssembly的例子

main.go

package main

import "fmt"

func main() {
    fmt.Println("hello WebAssembly")
}

编译这个go文件: GOOS=js GOARCH=wasm go build -o main.wasm main.go

index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Go wasm</title>
</head>
<body>
    <script src="wasm_exec.js"></script>
    <script>
        const go = new Go();
        WebAssembly.instantiateStreaming(
            fetch("main.wasm"),go.importObject).then((result) => {
            go.run(result.instance)
        });

    </script>
</body>
</html>

这里还需要一个wasm_exec.js。这个文件是官方提供的,可以理解为是go编译出来的二进制文件和js间的连接桥梁。这个时候打开浏览器,可以看到控制台下有:hello WebAssembly

需要说明的是,在go里面的所有STDOUT,都会在浏览器的控制台打印出来。

wasm的初始化过程

WebAssembly.instantiateStreaming(fetch("main.wasm"),go.importObject)。浏览器执行这样一段代码来加载main.wasm,并返回一个Promise对象。在加载完毕之后,调用go.run(result.instance)来执行wasm中的代码。

加载一个wasm文件就是这样的简单。

go WebAssembly如何和js交互

上面的例子非常简单。但是WebAssembly技术绝非这样简单。我们可以在go中轻松的调用js中的方法。比如下面这句话:

js.Global().Get("document").Call("getElementById", "maxCubes").Set("value", 256)

相当于js中的

document.getElementById("maxCubes).setAttribute('value', 256)

同样的,我们也可以在go中定义js方法,这样就可以在js中直接调用了。

api.onMemInitCb = js.NewCallback(func(args []js.Value) {
        length := args[0].Int()
        api.console.Call("log", "length", length) // 调用js的console.log("length", length)
        api.inBuf = make([]uint8, length)
        // 拿到这个slice的SliceHeader
        hdr := (*reflect.SliceHeader)(unsafe.Pointer(&api.inBuf))
        ptr := uintptr(unsafe.Pointer(hdr.Data))

        api.console.Call("log", "ptr:", ptr)
        js.Global().Call("gotMem", ptr)

        fmt.Println("初始化Mem成功")
    })

js.Global().Set("initMem", api.onMemInitCb)

这样,我们就可以在js中直接使用initMem方法了。这样子,就相当于打通了go和js之间的通道,使得go WebAssembly几乎无所不能。

go和js之间如何通过内存传值。

这个部分是我在看go WebAssembly部分时最关注的。因为大多数时候,我们传参都不仅仅是256这样的字面值。比如我们在做图片处理的时候,浏览器加载图片后传给wasm去处理,这种时候传递的肯定是一个指向一段内存的指针。

go代码中:

api.onMemInitCb = js.NewCallback(func(args []js.Value) {
    length := args[0].Int()
    api.console.Call("log", "length", length) // 调用js的console.log("length", length)
    api.inBuf = make([]uint8, length)
    // 拿到这个slice的SliceHeader
    hdr := (*reflect.SliceHeader)(unsafe.Pointer(&api.inBuf))
    ptr := uintptr(unsafe.Pointer(hdr.Data))

    api.console.Call("log", "ptr:", ptr)
    js.Global().Call("gotMem", ptr)

    fmt.Println("初始化Mem成功")
})

js代码中:


function gotMem(pointer) { console.log("pointer", pointer) memoryBytes.set(bytes, pointer); // Now the image can be loaded from the slice. console.log("load image") loadImage(); } ...... let reader = new FileReader(); reader.onload = (ev) => { bytes = new Uint8Array(ev.target.result); initMem(bytes.length); let blob = new Blob([bytes], {'type': imageType}); document.getElementById("sourceImg").src = URL.createObjectURL(blob); }; imageType = this.files[0].type; reader.readAsArrayBuffer(this.files[0]);

上面有两段代码,js部分代码是在加载一张图片,然后转换为Uint8Array数组bytes,然后调用initMem方法,传递一个数组bytes的长度作为参数。而initMem是在go代码中定义的,initMem负责调用make([]uint8, length)去初始化需要的内存。然后通过一系列的转换获得申请的内存区域的指针ptr。然后调用js的gotMem将这个ptr传递给js代码。在gotMem中,memoryBytes.set(bytes, pointer)这句代码初始化了这块内存。

这里值得指出的是,memoryBytes是一段在wasm初始化时申请的内存,保存在一个Uint8Array数组中。如果我们打印出来的话,可以发现这段内存有1G。然后我们的go代码中调用make([]uint8, length)申请一块内存时,其实是在这段内存中申请的。比如说申请的区域为201883648~(201883648+107003)。我们只要在js中向memoryBytes的数组中给这块区域赋值,就把值传递给go的对象了。

再考虑一个问题。1G的初始化内存是不是太大了?这个内存是在编译go代码时由编译工具指定的。但是如果我们使用浏览器(比如chrome)的任务管理器查看这个窗口的占用内存时就会发现,实际占用并不会这么大。

在我的理解中(并不一定正确),这和C语言的malloc类似。malloc可以申请大于物理内存的虚拟内存,但是只要你不实际占用这么大内存,是不会有问题的。所以虽然go WebAssembly打印出来有1G的初始化内存,但是如果不是真的会使用,是不会占用这么大物理内存的。

关于初始化内存过大的问题,可以参考这个issues:cmd/compile: wasm code causes out of memory error on Chrome and Firefox for Android

一些关于WebAssembly内存的设计:Finer-grained control over memory

一个用go WebAssembly实现中位切分法处理图片的例子

关于中切分法可以参考我之前的这篇文章:中位切分法颜色量化

可以在这个地址进行预览: 预览地址

在浏览器中运行的效果(这里只展示了FastMap的效果,BestMap会好很多):

fastmap

代码的github地址是:WebAssembly-MedianCut

里面大多数的代码都是用的我之前的代码。可见go WebAssembly还可以非常舒服的复用之前的代码。

中位切分法颜色量化

首先举两个例子。

有一种视频接口叫VGA,这种视频接口有一个最大的缺点,就是同一时刻无法显示超过256种颜色。而对于一张true-color的图片,R、G、B都是有1个byte标示,因此一张true-color的图片可能有2^24种颜色。这远远超过了VGA可以同时展示的颜色数量。

我们都知道有一种图片格式叫PNG(
PNG格式分析与压缩原理
)。它的编码方案中,有一种使用调色板的编码方案。调色板上最多有256种颜色。这样每一种颜色都可以用一个byte的索引值代替。如果将一副超过256种颜色的png图使用调色板的编码方式,那么就可以明显的减少图片的体积。

那么如何将2^24种颜色用256种颜色表示呢?或者说,如何将m种颜色使用n种颜色来代替表示(m>n)。这就是这篇文章主要讨论的问题。

首先,如下图所示,RGB可以映射到三维空间中,R代表X轴,G代表Y轴,B代表Z轴。这样,任意一个RGB的组合都可以在空间中以一个点表示。

rgb-cube

对于一个24-bit的图片来说,通常来说颜色空间是连续的,因为颜色之间的最小差异是几乎不可能察觉的。现在,这个连续的颜色空间要被映射到256种离散的颜色上。将一个连续的变量映射到离散的集合上,就叫做量化

有了上面这些说明后,现在有一张颜色很多的图片。图片上所有的点都被映射到一个空间中的立方体上。因为图片上点的分布一般不是均匀的,那么就可能有很多个点的聚集块。一般来说,聚集的越密,代表这些点的颜色越接近,那么如果我们将这些聚集块的点用聚集块中心点的颜色来表示,这样就可以将很多接近的颜色用一种颜色来代替。其实,这个过程跟聚类是很相似的。只不过一般的聚类算法都是无监督的机器学习,效率要较差。而中位切分法则可以用很快的速度完成这样的聚类。

中位切分法

中位切分法用简短的话描述:将一个图片对应的RGB立方体切割成目标数量的紧凑的RGB立方体,然后用立方体的质心值代表立方体内所有点的值。重复这个过程直到得到想要的颜色数量。

这里假设我们要获取256个颜色。详细的流程如下:

  • 将整张图片转换成一个RGB立方体
  • 找到立方体的最长边,从中位数的地方开始切割。得到两个包含相同数量点的立方体。
  • 对分割成的立方体重复上一步的切割过程直到得到256个立方体。
  • 256个立方体的质心就是要计算的256个颜色值。

用一个例子来说明(为了简单,这里没有B颜色空间),这里有6种颜色,共14个点。想要得到4中颜色输出。

  1. 初始情况
Color (r,g)-coordinates Count
C0 (20,40) 3
C1 (40,20) 2
C2 (5,60) 4
C3 (50,80) 2
C4 (60,30) 1
C5 (80,50) 2
  1. 3次切割后的情况
Cube HistPtr.lower HistPtr.upper Colors Enclosed Cube Centroid
A3 0 0 C0 (20,40)
B3 1 1 C2 (5,60)
A2 2 3 C1,C4 (46.7,23.3)
B2 4 5 C5,C3 (65,65)

具体流程图如下:

第一次:最长边是R,中位数是(20 + 40) / 2 = 30。从30处切割。得到收缩后的矩形。左边的矩形为A1,右边为B1。
第二次: 切割B1的R边,得到A2,B2
第三次: 切割A1的R边,得到A3,B3

这个时候我们已经有4个矩形了。

这里我们有两个颜色映射方式。第一种是快速映射,以矩形的质心作为映射后的颜色值。第二种是最佳映射,得到和其他点的距离和最短的点作为映射后的颜色值。

切割过程

中位切分法的实现

  1. 递归方式。在描述的时候,我们RGB块描述成递归的方式。如下
Split(Cube){
  if (ncubes == 4) return;
  find longest axis of Cube;
  cut Cube at median to form CubeA, CubeB;
  Split(CubeA);
  Split(CubeB);
}

但是这种递归切割的方式是有问题的。结果如下图
递归切割

在递归情况下,B1将一直不会被处理。

2.为了解决1中的问题。我们可以设定一个最大深度(level)。比如我们要得到4个输出颜色。那么log 2 4=2。最大深度应该是2。当切割到最大深度时,我们就不再往下切割,转而切割那些还未被处理的更大的区域。

maxlevel = 2;
Split(Cube,level){
  if (ncubes == 4) return;
  if (Cube's level == maxlevel) return;
  find longest axis of Cube;
  cut Cube at median to form CubeA, CubeB;
  Split(CubeA, level+1);
  Split(CubeB, level+1);
}
  1. 但是这样又会有新的问题出现。如果一个立方体中只有一种颜色(实际上此时只有一个点),不能再往下切割。因此我们需要一种方案去解决这个问题。这里可以维持一个包含所有立方体的数组,然后根据优先级排序切割。优先级可以按照level从小到大排序,但是所有颜色数量为1的立方体都会被忽略。这样每次切割前对这个数组做一次排序,取出优先级最高的立方体进行切割。直到切割出指定数量的立方体或者所有的立方体颜色都为1。
build initial cube from histogram;
set initial cube's level to 0;
insert initial cube in list of cubes;
ncubes = 1;
while (ncubes < maxcubes){
  search for Cube with smallest level;
  find the longest axis of Cube;
  find the median along this axis;
  cut Cube at median to form CubeA, CubeB;
  set CubeA's level = Cube's level + 1;
  set CubeB's level = Cube's level + 1;
  insert CubeA in Cube's slot;
  add CubeB to end of list of cubes;
  ncubes = ncubes + 1;
}

实践

  1. 使用中位切分法提取图片的主题色

  2. 使用中位切分法压缩图片

中位切分法的实现的go语言版本:joyme123/MedianCut点击这里进行效果预览

参考文献

Median-Cut Color Quantization

前端图片主题色提取

go 内存模型

简介

go的内存模型旨在说明:一个协程中对变量v的写入产生的值可以保证被另一个协程中的对变量v的读取观察到。

Happens Before

在一个协程内,读写操作必须按照程序指定的顺序进行。在一个协程内,编译器和处理器可能对读写操作重写排序,但是这个排序的前提是:在当前协程内,不会改变程序的执行行为。但是这个重新排序是不保证其他协程观测到执行顺序是不改变的。比如在协程1中a=1;b=2,但在其他协程的感知中,可能b比a先更新值。

我们这里定义Happens Before(在...之前发生),如果事件e1在事件e2之前发生,那么我们就可以说e2在e1之后发生。如果e1既不在e2之前发生,也不在e2之后发生。那么e1和e2就是同时发生的(并发)。

在一个协程内,Happens Before的顺序就是程序表达的那样。

如果下面两点可以保证,就说明对变量v的读取r允许观察到对变量v的写入w:

  • r不是在w之前发生
  • 在w之后并且r之前没有其他的对v的写入w’

为了保证变量v的读取r观察到v的特定写入w,并且保证w是唯一允许被r观察到的。也就是说,r保证能观察到w。需要做到下面两点:

  • w在r之前发生
  • 其他的对v的写入w’要么发生在w之前,要么发生在w之后

下面两点比上面两点要求更为严格。它保证了没有其他的写入w’和w、r同时发生。

在一个协程内,因为没有并发,所以这两种定义是一致的:读取r可以观察到写入w对变量v最近一次的写入。但是当多个协程同时访问同一个共享变量时,就必须使用同步事件来建立Happens Before语义来保证读取r可以观察到指定的写入w。

在内存模型中,对变量v以0值初始化是一次写入。

对于大于单机器字节的读取和写入,可以看做是对多个单机器字节的乱序操作。

同步

初始化

程序初始化是在单协程内运行的,但是这个协程可能创建其他的协程。它们是并发的。

如果包p引入了包q,则q的初始化函数会在p的初始化函数之前运行。

main.main函数在所有的初始化函数之后运行。

协程创建

go关键字创建协程发生在协程运行之前。

协程销毁

协程的销毁不保证在程序中的任何事件发生之前。比如:

var a string

func hello() {
    go func() { a = "hello" }()
    print(a)
}

这个赋值没有跟随任何同步事件,所以它不保证被其他协程观察到。事实上,激进的编译器会删除整个go语句。

如果需要,可以使用同步原语比如管道通信来建立一个相关的执行顺序。

管道通信

在go的协程中,管道通信是非常重要的一个同步方法。通常发送方和接受方在两个不同的协程中,利用发送和接收这两个有序的动作来进行同步。

1.在有缓冲的管道中,发送一定发生在接收完成前。(A send on a channel happens before the corresponding receive from that channel completes.

例如:

var c = make(chan int, 10)
var a string

func f() {
    a = "hello, world"
    c <- 0
}

func main() {
    go f()
    <-c
    print(a)
}

a = "hello, world"一定在c<-0之前发生,c<-0一定在<-c之前发生,<-c一定在print(a)之前发生。这样就能保证a = "hello, world"print(a)之前发生。则保证可以打印出hello, world

2.管道的关闭一定发生在从管道中接收值之前。

因此上面的例子将<-c替换成close(c)也是可以的。

3.在无缓冲管道中,接收一定发生在发送完成前。(The closing of a channel happens before a receive that returns a zero value because the channel is closed.)

例如下面的例子将发送和接收语句互换了位置。

var c = make(chan int)
var a string

func f() {
    a = "hello, world"
    <-c
}
func main() {
    go f()
    c <- 0
    print(a)
}

如果上述例子中管道是有缓冲的(e.g., c = make(chan int, 1)) ,就无法保证一定能打印出hello,world

4.在容量为C的管道中,第k个接收发生在k+C个发送完成之前。(The kth receive on a channel with capacity C happens before the k+Cth send from that channel completes.)

第4点推广了第一点的规则。这里其实有一点绕,举个例子:第1个接收发生在1+C个发送完成之前。首先思考:第一个接收能否保证在0+C个发送完成之前?答案是不能。因为管道有C个容量的缓冲,C个发送语句发送完成前,完全可以不调用接收语句。那第1个接收发生在1+C个发送完成之前如何保证,我们知道,当管道缓冲满了之后,就无法向管道中发送,发送语句会阻塞。因此必须在接收之后发送语句才能继续执行。

第四点规则使得计数信号量可以由缓冲管道建模:管道中的数量对应了当前并发量,管道的容量对应了最大并发量。发送语句占用一个信用量,接收语句释放一个信号量。这是限制并发量的一个惯用手段。

下面的例子限制了最大并发量为3:

var limit = make(chan int, 3)

func main() {
    for _, w := range work {
        go func(w func()) {
            limit <- 1
            w()
            <-limit
        }(w)
    }
    select{}
}

同样的,我们也可以用lock和once来实现happens before语义

不正确的同步方式

即使读r可以观察到同时发生的写w的值,但这并不意味这在r之后发生的读r’可以观察到在w之前发生的写w’。例如:

var a, b int

func f() {
    a = 1
    b = 2
}

func g() {
    print(b)
    print(a)
}

func main() {
    go f()
    g()
}

这段程序可能打印出2、0

这种现象使得一些常见的方式失效。比如双重锁定检查

var a string
var done bool

func setup() {
    a = "hello, world"
    done = true
}

func doprint() {
    if !done {
        once.Do(setup)
    }
    print(a)
}

func twoprint() {
    go doprint()
    go doprint()
}

这段程序不能保证print(a)时能够观察到a的值一定是hello, world

同样的,还有一种循环等待的写法也可能有问题,例如:

var a string
var done bool

func setup() {
    a = "hello, world"
    done = true
}

func main() {
    go setup()
    for !done {
    }
    print(a)
}

这段程序也不保证print(a)一定能打印出内容,甚至更坏的情况下无法观察到done发生
了改变,因此程序会死循环下去。

还有一种衍生版本的写法也会有问题

type T struct {
    msg string
}

var g *T

func setup() {
    t := new(T)
    t.msg = "hello, world"
    g = t
}

func main() {
    go setup()
    for g == nil {
    }
    print(g.msg)
}

即使main协程观察到了g被赋值,也不一定能观察到g.msg有值。

参考

The Go Memory Model(原文)