理解go context

理解context

在我刚接触context包时,我是有一点迷惑的。因为在其他的编程语言中很少有接触到context包类似的用法。比如在js绘制canvas中的context,也只是作为保留上下文操作来用的。在go语言的context包中,同样也可以当成上下文来理解,但是在看待context提供的能力时,要从以下两点来理解:

  • context提供了一种管理多个goroutine的机制。
  • context最终形成了一种树形结构。

在深入之前,让我们回忆一下多线程/进程模型中,主线程/进程是如何管理子线程/进程的。如果子线程/进程又派生了其他的线程/进程呢?这一定是一个头疼的问题。

在go语言中,协程也面临了同样的问题。因此官方在go1.7版本中引入了context包。那么context提供了什么样的能力来管理协程呢?先看一个withCancel的简单的例子

func watch(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            log.Println("退出")
            return

        default:
            log.Println("执行逻辑")
            time.Sleep(2 * time.Second)
        }
    }
}

func withCancel() {
    ctx, cancel := context.WithCancel(context.Background())

    go watch(ctx)
    go watch(ctx)
    go watch(ctx)

    time.Sleep(6 * time.Second)
    fmt.Println("可以了,通知子协程停止")
    cancel()
    //为了检测子协程是否停止,如果没有输出,就表示停止了
    time.Sleep(5 * time.Second)
}

调用withCancel的输出如下:

2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
可以了,通知子协程停止
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出

可以看到,我们通过context.WithCancel方法生成了一个ctx和一个cancel,然后主动调用cancel就可以通过所有的子协程退出了。在watch方法的实现中,我们是通过select机制来实现的,一旦context的Done()方法有值,就会调用return退出,否则的话就执行default中我们的业务逻辑。

这样我们就可以随时通知所有的子协程退出了。在上面说到,context最终形成了一种树形结构,是因为在子协程中也可以继续使用新的协程,这样就形成了一个树形的调用了。

协程的树形结构

我们在1中使用cancel方法,就可以向下传播,在2~10号协程中全部退出。

简单的了解context包的使用后,可以看一下context.Context这个接口,为了简洁,我删除了源代码中的注释。

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

Context接口总共提供了4个方法。

  • Deadline()用来获取当前context的取消时间,第二个返回值ok等于false的时候,表示没有设置。
  • Done()方法返回了一个chan,当chan中读取到值的时候,表示父context已经发起了取消的请求,那么当前协程开始做相关的清理工作然后退出。
  • Err()返回context的取消原因
  • Value()方法用来通过一个key获取当前Context上与之对应的值。

理解Context的树形结构

go中大量的库都使用了context机制,比如database/sql库,net/http库等等,因为这些库都支持了context,使得我们在程序中很容易通过context来管理所有新建的协程,而不用自己实现复杂的机制来管理。一旦我们需要取消,只需要在root context调用cancel方法即可。

一些基本使用

在上面的例子中,我们使用了withCancel来实例化一个可以手动取消的context。context包中同样提供了一些其他的方法。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithDeadline可以设置截止时间。会到达指定时间时自动取消。当然也可以调用CancelFunc来手动取消。

WithTimeout可以设置在一段时间后自动取消,和WithDeadline类似。

参考

Go语言实战笔记(二十)
Golang Context深入理解
Go Concurrency Patterns: context

go处理多态的JSON

最近使用go在对h2o的REST API进行封装时,发现了h2o的JSON返回值中有Polymorphic类型的字段。Polymorphic可以翻译为多态。一个多态的类型可以理解为既可能是float型,也可能是string类型等等。

在我的理解中,一个JSON中每个字段的类型都必须是确定的,在动态语言比如php, js这种,处理这种不确定的类型很方便。但是在go这种静态语言中,一个不确定的类型导致在解析中总是会出现类似于这样子的错误: json: cannot unmarshal string into Go struct field Foo.Value of type int

使用interface{}处理多态的问题

在go中,interface{}是一个空接口,那么所有类型都可以看做实现了这个空接口,因此interface{}可以接收任何类型的值。那么如果一个json既可能是{"mean": 123}, 又可能是{"mean": "NaN"},就可以使用以下结构体:

type Prediction struct {
    Mean interface{} `json:"mean"`
}

但是在使用Mean这个变量的时候,就比较麻烦了。

t2 := `{"mean": "NaN"}`

var p2 Prediction
err = json.Unmarshal([]byte(t2), &p2)

if err != nil {
    log.Println(err)
} else {
    if mean, ok := p2.Mean.(string); ok {
        log.Printf("p2 mean: %s \n", mean)
    } else if n, ok := p2.Mean.(int); ok {
        mean = strconv.Itoa(n)
        log.Printf("p2 mean: %s \n", mean)
    }
}

实现Unmarshaler和Marshaler接口

type Marshaler interface {
        MarshalJSON() ([]byte, error)
}

type Unmarshaler interface {
        UnmarshalJSON([]byte) error
}

在使用json.Unmarshaljson.Marshal时,会自动调用对应变量类型的UnmarshalJSONMarshalJSON方法。这样就可以定义一个FlexString类型

type FlexString string
type Prediction struct {
    Mean FlexString `json:"mean"`
}

然后实现*FlexString的UnmarshalJSON方法和FlexString的MarshalJSON方法。

func (fs *FlexString) UnmarshalJSON(b []byte) error {
    if b[0] == '"' {
        return json.Unmarshal(b, (*string)(fs))
    }
    var n float64
    if err := json.Unmarshal(b, &n); err != nil {
        return err
    }

    s := strconv.FormatFloat(n, 'g', 15, 64)

    *fs = FlexString(s)
    return nil
}

func (fs FlexString) MarshalJSON() ([]byte, error) {
    s := string(fs)
    n, err := strconv.ParseFloat(s, 64)
    if err != nil {
        return json.Marshal(s)
    }

    if math.IsNaN(n) {
        return json.Marshal(s)
    }

    return json.Marshal(n)
}

这样在使用的时候,就可以正常的对多态的Mean进行json解码以及编码了。

func main() {
    t1 := `{"mean": 123.1212}`
    var p1 Prediction
    err := json.Unmarshal([]byte(t1), &p1)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p1 mean value:", p1.Mean)
    }

    res1, err := json.Marshal(p1)
    if err != nil {
        log.Println("res1 error:", err)
    } else {
        log.Println("res1 :", string(res1))
    }

    t2 := `{"mean": "NaN"}`
    var p2 Prediction
    err = json.Unmarshal([]byte(t2), &p2)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p2 mean value:", p2.Mean)
    }

    res2, err := json.Marshal(p2)
    if err != nil {
        log.Println("res2 error:", err)
    } else {
        log.Println("res2 :", string(res2))
    }
}

运行结果如下:

2019/07/09 16:38:42 p1 mean value: 123.1212
2019/07/09 16:38:42 res1 : {"mean":123.1212}
2019/07/09 16:38:42 p2 mean value: NaN
2019/07/09 16:38:42 res2 : {"mean":"NaN"}

123.1212在解码再编码之后,仍然是float类型,NaN仍然是string类型。这里有一个注意事项,就是n, err := strconv.ParseFloat(s, 64)这个方法,如果s是NaN的字符串并不会出错,而是将n变成一个NaN,因此需要使用math.IsNaN进行检查。

如何在go中优雅的热升级服务

一、概述

在日常业务中,服务会经常升级,但是因为某些原因不希望断开和客户端的连接。因此就需要服务的热升级技术。

在研究这个问题之前,可以先看一下nginx是如何做到不间断服务热重启的。

  • 将新的nginx可执行文件替换掉旧的可执行文件。
  • master进程发送USR2信号,master进程在接收到信号后会将pid文件命名为.oldbin后缀。之后启动新的可执行文件,并启动新的worker进程。这个时候会有两个master进程

  • 向第一个master进程发送WINCH信号。第一个master进程会通知旧的worker进程优雅(处理完当前请求)地退出。

  • 如果此时新的可执行文件有问题。可以做以下措施:
  • 向旧的master进程发送HUP信号,旧的master进程会启动新的worker进程,并且不会重新读取配置文件。然后会向新的master进程发送QUIT信号来要求其退出。
  • 发送TERM信号到新的master进程,新的master进程和其派生的worker进程都会立刻退出。旧的master进程会自动启动新的worker进程。
  • 如果升级成功,QUIT信号会发送给旧的master进程,并退出。

完整的文档可以看: http://nginx.org/en/docs/control.html#upgrade

在go中处理这个问题也是这个思路。

二、具体实现

2.1 定义服务

type Server struct {
l         net.Listener  // 监听端口
conns     map[int]net.Conn  // 当前服务的所有连接
rw        sync.RWMutex      // 读写锁,用来保证conns在并发情况下的正常工作
idLock    sync.Mutex        // 锁,用来保证idCursor在并发情况下的递增没有问题
idCursor  int               // 用来标记当前连接的id
isChild   bool // 是否是子进程
status    int  // 当前服务的状态
relaxTime int  // 在退出时允许协程处理请求的时间
}

2.2 处理信号

func (s *Server) handleSignal() {
sc := make(chan os.Signal)

signal.Notify(sc, syscall.SIGHUP, syscall.SIGTERM)

for {
sig := <-sc

switch sig {
case syscall.SIGHUP:
log.Println("signal sighup")
// reload
go func() {
s.fork()
}()
case syscall.SIGTERM:
log.Println("signal sigterm")
// stop
s.shutdown()
}
}
}

这里只处理了两个信号,HUP表示要热升级服务,此时会fork一个新的服务。TERM表示要终止服务。

2.3 如何fork新的服务

func (s *Server) fork() (err error) {

log.Println("start forking")
serverLock.Lock()
defer serverLock.Unlock()

if isForked {
return errors.New("Another process already forked. Ignoring this one")
}

isForked = true

files := make([]*os.File, 1+len(s.conns))
files[0], err = s.l.(*net.TCPListener).File() // 将监听带入到子进程中
if err != nil {
log.Println(err)
return
}

i := 1
for _, conn := range s.conns {
files[i], err = conn.(*net.TCPConn).File()

if err != nil {
log.Println(err)
return
}

i++
}

env := append(os.Environ(), CHILD_PROCESS+"=1")
env = append(env, fmt.Sprintf("%s=%s", SERVER_CONN, strconv.Itoa(len(s.conns))))

path := os.Args[0] // 当前可执行程序的路径
var args []string
if len(os.Args) > 1 {
args = os.Args[1:]
}

cmd := exec.Command(path, args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = files
cmd.Env = env

err = cmd.Start()
if err != nil {
log.Println(err)
return
}

return
}

这里会将监听的文件描述符以及所有连接的文件描述符都带到新的服务中。这里只需要在新的服务中重新使用这些文件描述符即可保证不断开连接。

2.4 服务的启动流程

func (s *Server) Start(addr string) {
var err error

log.Printf("pid: %v \n", os.Getpid())

s.setState(StateInit)

if s.isChild {
log.Println("进入子进程")
// 通知父进程停止
ppid := os.Getppid()

err := syscall.Kill(ppid, syscall.SIGTERM)

if err != nil {
log.Fatal(err)
}

// 子进程, 重新监听之前的连接
connN, err := strconv.Atoi(os.Getenv(SERVER_CONN))
if err != nil {
log.Fatal(err)
}

for i := 0; i < connN; i++ {
f := os.NewFile(uintptr(4+i), "")
c, err := net.FileConn(f)
if err != nil {
log.Print(err)
} else {
id := s.add(c)
go s.handleConn(c, id)
}
}
}

s.l, err = s.getListener(addr)
if err != nil {
log.Fatal(err)
}
defer s.l.Close()

log.Println("listen on ", addr)

go s.handleSignal()

s.setState(StateRunning)

for {
log.Println("start accept")
conn, err := s.l.Accept()
if err != nil {
log.Fatal(err)
return
}

log.Println("accept new conn")

id := s.add(conn)
go s.handleConn(conn, id)
}

}

func (s *Server) getListener(addr string) (l net.Listener, err error) {
if s.isChild {
f := os.NewFile(3, "")
l, err = net.FileListener(f)
return
}

l, err = net.Listen("tcp", addr)

return
}

启动时,会判断是否是fork出的新的进程。如果是,则继承从父进程传递过来的文件描述符,并重新监听或作为连接处理。

完整的代码参考github: https://github.com/joyme123/graceful_restart_server_in_golang_demo

go WebAssembly初体验

WebAssembly是一门新的浏览器技术。可以取代一部分js的角色,并且从性能上来说,要比js好很多。WebAssembly目前还处于早期的发展阶段,仍然不够成熟。go语言对WebAssembly的支持也是在1.11版本上刚刚加入。虽然不能投入生产环境,但是可以用来做一些很有意思的事情。

官方WebAssembly的文档:https://github.com/golang/go/wiki/WebAssembly。详细的介绍可以看官方的文档。

一个go WebAssembly的例子

main.go

package main

import "fmt"

func main() {
    fmt.Println("hello WebAssembly")
}

编译这个go文件: GOOS=js GOARCH=wasm go build -o main.wasm main.go

index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>Go wasm</title>
</head>
<body>
    <script src="wasm_exec.js"></script>
    <script>
        const go = new Go();
        WebAssembly.instantiateStreaming(
            fetch("main.wasm"),go.importObject).then((result) => {
            go.run(result.instance)
        });

    </script>
</body>
</html>

这里还需要一个wasm_exec.js。这个文件是官方提供的,可以理解为是go编译出来的二进制文件和js间的连接桥梁。这个时候打开浏览器,可以看到控制台下有:hello WebAssembly

需要说明的是,在go里面的所有STDOUT,都会在浏览器的控制台打印出来。

wasm的初始化过程

WebAssembly.instantiateStreaming(fetch("main.wasm"),go.importObject)。浏览器执行这样一段代码来加载main.wasm,并返回一个Promise对象。在加载完毕之后,调用go.run(result.instance)来执行wasm中的代码。

加载一个wasm文件就是这样的简单。

go WebAssembly如何和js交互

上面的例子非常简单。但是WebAssembly技术绝非这样简单。我们可以在go中轻松的调用js中的方法。比如下面这句话:

js.Global().Get("document").Call("getElementById", "maxCubes").Set("value", 256)

相当于js中的

document.getElementById("maxCubes).setAttribute('value', 256)

同样的,我们也可以在go中定义js方法,这样就可以在js中直接调用了。

api.onMemInitCb = js.NewCallback(func(args []js.Value) {
        length := args[0].Int()
        api.console.Call("log", "length", length) // 调用js的console.log("length", length)
        api.inBuf = make([]uint8, length)
        // 拿到这个slice的SliceHeader
        hdr := (*reflect.SliceHeader)(unsafe.Pointer(&api.inBuf))
        ptr := uintptr(unsafe.Pointer(hdr.Data))

        api.console.Call("log", "ptr:", ptr)
        js.Global().Call("gotMem", ptr)

        fmt.Println("初始化Mem成功")
    })

js.Global().Set("initMem", api.onMemInitCb)

这样,我们就可以在js中直接使用initMem方法了。这样子,就相当于打通了go和js之间的通道,使得go WebAssembly几乎无所不能。

go和js之间如何通过内存传值。

这个部分是我在看go WebAssembly部分时最关注的。因为大多数时候,我们传参都不仅仅是256这样的字面值。比如我们在做图片处理的时候,浏览器加载图片后传给wasm去处理,这种时候传递的肯定是一个指向一段内存的指针。

go代码中:

api.onMemInitCb = js.NewCallback(func(args []js.Value) {
    length := args[0].Int()
    api.console.Call("log", "length", length) // 调用js的console.log("length", length)
    api.inBuf = make([]uint8, length)
    // 拿到这个slice的SliceHeader
    hdr := (*reflect.SliceHeader)(unsafe.Pointer(&api.inBuf))
    ptr := uintptr(unsafe.Pointer(hdr.Data))

    api.console.Call("log", "ptr:", ptr)
    js.Global().Call("gotMem", ptr)

    fmt.Println("初始化Mem成功")
})

js代码中:

function gotMem(pointer) {

    console.log("pointer", pointer)

    memoryBytes.set(bytes, pointer);
    // Now the image can be loaded from the slice.

    console.log("load image")
    loadImage();
}

......

let reader = new FileReader();
reader.onload = (ev) => {
    bytes = new Uint8Array(ev.target.result);
    initMem(bytes.length);
    let blob = new Blob([bytes], {'type': imageType});
    document.getElementById("sourceImg").src = URL.createObjectURL(blob);
};
imageType = this.files[0].type;
reader.readAsArrayBuffer(this.files[0]);

上面有两段代码,js部分代码是在加载一张图片,然后转换为Uint8Array数组bytes,然后调用initMem方法,传递一个数组bytes的长度作为参数。而initMem是在go代码中定义的,initMem负责调用make([]uint8, length)去初始化需要的内存。然后通过一系列的转换获得申请的内存区域的指针ptr。然后调用js的gotMem将这个ptr传递给js代码。在gotMem中,memoryBytes.set(bytes, pointer)这句代码初始化了这块内存。

这里值得指出的是,memoryBytes是一段在wasm初始化时申请的内存,保存在一个Uint8Array数组中。如果我们打印出来的话,可以发现这段内存有1G。然后我们的go代码中调用make([]uint8, length)申请一块内存时,其实是在这段内存中申请的。比如说申请的区域为201883648~(201883648+107003)。我们只要在js中向memoryBytes的数组中给这块区域赋值,就把值传递给go的对象了。

再考虑一个问题。1G的初始化内存是不是太大了?这个内存是在编译go代码时由编译工具指定的。但是如果我们使用浏览器(比如chrome)的任务管理器查看这个窗口的占用内存时就会发现,实际占用并不会这么大。

在我的理解中(并不一定正确),这和C语言的malloc类似。malloc可以申请大于物理内存的虚拟内存,但是只要你不实际占用这么大内存,是不会有问题的。所以虽然go WebAssembly打印出来有1G的初始化内存,但是如果不是真的会使用,是不会占用这么大物理内存的。

关于初始化内存过大的问题,可以参考这个issues:cmd/compile: wasm code causes out of memory error on Chrome and Firefox for Android

一些关于WebAssembly内存的设计:Finer-grained control over memory

一个用go WebAssembly实现中位切分法处理图片的例子

关于中切分法可以参考我之前的这篇文章:中位切分法颜色量化

可以在这个地址进行预览: 预览地址

在浏览器中运行的效果(这里只展示了FastMap的效果,BestMap会好很多):

fastmap

代码的github地址是:WebAssembly-MedianCut

里面大多数的代码都是用的我之前的代码。可见go WebAssembly还可以非常舒服的复用之前的代码。