rook ceph的rgw崩溃问题排查

问题

在开发可视化机器学习平台时,集成的FastRCNN实验一直跑不到结束就会出错。有时候是在下载基础模型以及代码包时出错,有时候在train结束后向predict传递artifacts出错。

过程

首先这个问题出现在局域网内,处于开发环境,因此ceph没有做高可用的部署。其次,ceph是用rook这个项目部署在k8s集群中的。

最后,在使用argo做机器学习的资源调度时,会出现大的数据资源下载和转移出现错误。具体表现为:大量数据下载会出现connectiion refused,日志如下:

2019-10-25 02:35:24 (20.1 MB/s) - Connection closed at byte 528482304. Retrying.
--2019-10-25 02:35:25--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/tho6wHm0UmZeZYbfWBv5lkOZ576838763
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

大量数据上传时也会中断,导致argo无法调用下一步:

日志如下:

NAME            custom-workflow-43-6rhlb.api-train-faster-1699
TYPE            Pod
PHASE           Error
MESSAGE         failed to save outputs: timed out waiting for the condition
START TIME      2019-10-24T06:15:56Z
END TIME        2019-10-24T06:30:34Z
DURATION        14:38 min

这里可能是网络问题,argo的问题或者是ceph的问题。但是当数据量不大的时候不会出现错误。因此检查ceph是否正常

~ » kubectl -n rook-ceph get pods                                                
NAME                                           READY   STATUS      RESTARTS   AGE
csi-cephfsplugin-964zm                         3/3     Running     27         46d
csi-cephfsplugin-dxnbg                         3/3     Running     12         46d
csi-cephfsplugin-provisioner-b66d48bc8-fglq9   4/4     Running     0          12d
csi-cephfsplugin-provisioner-b66d48bc8-x67pd   4/4     Running     0          12d
csi-rbdplugin-5fs2x                            3/3     Running     27         46d
csi-rbdplugin-bddlt                            3/3     Running     12         46d
csi-rbdplugin-provisioner-95dd85d6-7kc4c       5/5     Running     0          12d
csi-rbdplugin-provisioner-95dd85d6-mpjtj       5/5     Running     0          12d
rook-ceph-agent-fs4xq                          1/1     Running     9          46d
rook-ceph-agent-wx6r4                          1/1     Running     4          46d
rook-ceph-mds-myfs-a-774974c8c4-xt2ls          1/1     Running     0          12d
rook-ceph-mds-myfs-b-748d7d7f7d-wftt5          1/1     Running     0          12d
rook-ceph-mgr-a-5f54d44c98-57qcb               1/1     Running     0          12d
rook-ceph-mon-a-6f9fbfc99d-lmb6c               1/1     Running     0          17d
rook-ceph-operator-6f556bcbff-glvt6            1/1     Running     0          12d
rook-ceph-osd-0-7c489dc87b-wkt7x               1/1     Running     0          17d
rook-ceph-osd-1-86cc67cc45-25h4q               1/1     Running     0          12d
rook-ceph-osd-prepare-worknode1-xnmpw          0/1     Completed   0          12d
rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm      1/1     Running     65         12d
rook-ceph-tools-5f5dc75fd5-52jbj               1/1     Running     0          12d
rook-discover-g95ws                            1/1     Running     6          46d
rook-discover-vs5fs                            1/1     Running     13         46d

发现ceph rgw重启了65次,这个肯定是不正常的。查看ceph rgw的日志:

$ kubectl -n rook-ceph logs -p rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm

# 截取了一部分日志
debug 2019-10-25 02:35:13.473 7f4880b98700  1 ====== starting new request req=0x55aa678c48e0 =====
debug 2019-10-25 02:35:14.549 7f4880b98700  0 ERROR: client_io->complete_request() returned Broken pipe
debug 2019-10-25 02:35:14.549 7f4880b98700  1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== starting new request req=0x55aa54a488e0 =====
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== req done req=0x55aa54a488e0 op status=0 http_status=404 latency=0s ======

在我的理解中broken pipe一般出现在向已关闭的连接中写入数据时,会出现这个问题。但是通过日志可以发现,出现broken pipe的错误之后,rgw仍然是在处理请求的,但是部分请求的latency很高。因此这里的broken pipe是表示着rgw开始出现一些异常情况,但不是pod重启的直接原因。

真正导致rgw被杀死的原因是因为rgw进程收到了sigterm信号,然后进程被杀死。

debug 2019-10-25 02:35:24.525 7f494c52f700 -1 received  signal: Terminated from Kernel ( Could be generated by pthread_kill(), raise(), abort(), alarm() ) UID: 0
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm set alarm for 120
debug 2019-10-25 02:35:24.525 7f4962116780 -1 shutting down
debug 2019-10-25 02:35:24.629 7f488cbb0700  0 iterate_obj() failed with -9

使用kubectl describe查看pod的event:

Events:
  Type     Reason     Age                  From                Message
  ----     ------     ----                 ----                -------
  Normal   Killing    15m (x65 over 12d)   kubelet, worknode1  Container rgw failed liveness probe, will be restarted
  Warning  Unhealthy  15m (x249 over 12d)  kubelet, worknode1  Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers)
  Normal   Pulled     15m (x66 over 12d)   kubelet, worknode1  Container image "ceph/ceph:v14.2.2-20190826" already present on machine
  Normal   Created    15m (x66 over 12d)   kubelet, worknode1  Created container rgw
  Normal   Started    15m (x66 over 12d)   kubelet, worknode1  Started container rgw

这里才是真正的重启原因,kubelet检查pod是否存活,但是请求超时了。意味pods出现的故障,因此杀死了pod并重启。

pod的liveness设置:

Liveness:       http-get http://:80/ delay=10s timeout=1s period=10s #success=1 #failure=3

k8s的liveness机制是检查pod中应用程序存活状态并在出错后自动重启的一种机制。提供了三种方式:

  • 在容器内执行命令,如果执行成功,则表示容器是存活并且健康的。否则就重启容器使得应用程序恢复正常。
  • 使用http请求检查,如果返回的状态码是200则表示正常,否则表示失败。
  • 使用tcp连接检查,如果kubelet可以打开指定端口的socket连接,则表示正常,否则表示失败。

在这个场景下出现Warning Unhealthy 15m (x249 over 12d) kubelet, worknode1 Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers),表示kubelet使用http get检查pod的80端口,但是这个请求却超时了。因此杀死了容器并重启,导致大文件(700MB以上)上传/下载失败。

这里kubelet检查的是http://10.42.2.44:80这个地址,我们回过头看一下argo那边的报错信息,Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.。都是80端口,当然这里千万不能被ip地址误导了,10.42.2.44是pod的ip地址,10.43.126.166是service的ip地址,我们可以验证一下:

~ » kubectl -n rook-ceph get svc                                                 
NAME                              TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
rook-ceph-rgw-my-store            ClusterIP   10.43.126.166   <none>        80/TCP              46d

然后再结合之前debug 2019-10-25 02:35:14.549 7f4880b98700 1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======这条日志,latency已经超过了1s,而kubelet的liveness超时时间是1s。

现在基本可以得出以下异常流程:

  1. 因为某些原因,导致rgw出现broken pipe的出错,并且部分请求的lantency时间大大提高。
  2. kubelet周期性的对rgw做liveness的检查,并且检查的http就是rgw的80端口,这个端口因为上面的原因导致lantency超过了1s,而liveness检查的timeout只有1s。因此kubelet认为该pod不健康,选择重启。
  3. kubelet向rgw发送了sigterm信号,rgw关闭进程,pod重启。

猜测可能造成这个问题的原因:

  1. ceph所在机器的性能不够,导致响应请求出现问题。
  2. 局域网的网络问题,因为内部的最高带宽只有10MB/s,但是局域网内的设备很多,网络这部分导致了瓶颈。

关于机器性能的问题,我认为是可以排除的,因为机器性能本身很好,并且开发环境几乎没有请求量,接下来就是验证是因为网络问题导致瓶颈,造成部分接口延迟过高被杀死。

为了验证这个猜想,假设这里有三台机器A,B,C,组成了一个k8s集群,ceph是部署在k8s之上的。在A之上,我用dd命令产生一个40G的大文件,然后在B之上使用wget下载。然后在C上面观察ping的延迟是否上升。

A:

$ dd if=/dev/zero of=test bs=1M count=0 seek=40000

$ python -m SimpleHTTPServer 8999

B:

$ wget http://192.168.50.37:8999/test

--2019-10-25 14:18:30--  http://192.168.50.37:8999/test
正在连接 192.168.50.37:8999... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 41943040000 (39G) [application/octet-stream]
正在保存至: “test”

test      3%[==>            ]   1.54G  11.1MB/s    剩余 73m 5s

C:

$ ping 192.168.50.37
PING 192.168.50.37 (192.168.50.37) 56(84) bytes of data.
64 bytes from 192.168.50.37: icmp_seq=1 ttl=64 time=0.384 ms
64 bytes from 192.168.50.37: icmp_seq=2 ttl=64 time=0.373 ms
64 bytes from 192.168.50.37: icmp_seq=3 ttl=64 time=0.336 ms
64 bytes from 192.168.50.37: icmp_seq=4 ttl=64 time=4.90 ms
64 bytes from 192.168.50.37: icmp_seq=5 ttl=64 time=1.18 ms
64 bytes from 192.168.50.37: icmp_seq=6 ttl=64 time=7.74 ms
64 bytes from 192.168.50.37: icmp_seq=7 ttl=64 time=3.51 ms
64 bytes from 192.168.50.37: icmp_seq=8 ttl=64 time=6.66 ms
64 bytes from 192.168.50.37: icmp_seq=9 ttl=64 time=6.31 ms

网络延迟增加的还是很明显的。

这时候使用argo开始一个新的机器学习的实验,但是这个实验的数据量较小,在之前的使用中都没有问题。结果确实出现了问题:

2019-10-25 06:21:04 (8.10 MB/s) - Connection closed at byte 136314880. Retrying.
--2019-10-25 06:21:05--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/maC8Om6Y3QbSJxSTT9x82rp8908272441
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

那么为了有对照实验,将下载关闭,重新做这个机器学习的实验,结果正常。

解决方法

因为缺少了对ceph这块源代码的研究,上面的结论并不一定正确。但是可以大概得出如何解决,可以先尝试将liveness检测的timeout时间增加。

kubectl -n rook-ceph edit deployment rook-ceph-rgw-my-store-a

把liveness的timeout时间调成5s,这样就解决了这个问题。

etcd分布式锁的实现方式

一、概述

在etcd的clientv3包中,实现了分布式锁。使用起来和mutex是类似的,为了了解其中的工作机制,这里简要的做一下总结。

二、使用方式

etcd分布式锁的实现在go.etcd.io/etcd/clientv3/concurrency包中,主要提供了以下几个方法:

  • func NewMutex(s *Session, pfx string) *Mutex, 用来新建一个mutex
  • func (m *Mutex) Lock(ctx context.Context) error,它会阻塞直到拿到了锁,并且支持通过context来取消获取锁。
  • func (m *Mutex) Unlock(ctx context.Context) error,解锁

因此在使用etcd提供的分布式锁式非常简单,通常就是实例化一个mutex,然后尝试抢占锁,之后进行业务处理,最后解锁即可。

一个简单的例子如下:

package main

import (
    "context"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "sync"
    "time"
)

var n = 0

// 使用worker模拟锁的抢占
func worker(key string) error {
    endpoints := []string{"127.0.0.1:2379"}

    cfg := clientv3.Config{
        Endpoints:            endpoints,
        DialTimeout:          3 * time.Second,
    }

    cli, err := clientv3.New(cfg)
    if err != nil {
        log.Println("new cli error:", err)
        return err
    }

    sess, err := concurrency.NewSession(cli)
    if err != nil {
        return err
    }

    m := concurrency.NewMutex(sess, "/"+key)

    err = m.Lock(context.TODO())
    if err != nil {
        log.Println("lock error:", err)
        return err
    }

    defer func() {
        err = m.Unlock(context.TODO())
        if err != nil {
            log.Println("unlock error:", err)
        }
    }()

    log.Println("get lock: ", n)
    n++
    time.Sleep(time.Second) // 模拟执行代码


    return nil
}

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()


    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    wg.Wait()
}

三、实现机制

Lock()函数的实现很简单。这里可以贴出来看一下:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先通过一个事务来尝试加锁,这个事务主要包含了4个操作: cmpputgetgetOwner。需要注意的是,key是由pfxLease()组成的。

  • cmp: 比较加锁的key的修订版本是否是0。如果是0就代表这个锁不存在。
  • put: 向加锁的key中存储一个空值,这个操作就是一个加锁的操作,但是这把锁是有超时时间的,超时的时间是session的默认时长。超时是为了防止锁没有被正常释放导致死锁。
  • get: get就是通过key来查询
  • getOwner: 注意这里是用m.pfx来查询的,并且带了查询参数WithFirstCreate()。使用pfx来查询是因为其他的session也会用同样的pfx来尝试加锁,并且因为每个LeaseID都不同,所以第一次肯定会put成功。但是只有最早使用这个pfxsession才是持有锁的,所以这个getOwner的含义就是这样的。

接下来才是通过判断来检查是否持有锁

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是当前的版本号,resp.Succeededcmp为true时值为true,否则是false。这里的判断表明当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。

下面是取得锁的持有者的key。如果当前没有人持有这把锁,那么默认当前会话获得了锁。或者锁持有者的版本号和当前的版本号一致, 那么当前的会话就是锁的持有者。

// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
    m.Unlock(client.Ctx())
} else {
    m.hdr = hdr
}

上面这段代码就很好理解了,因为走到这里说明没有获取到锁,那么这里等待锁的删除。

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}

waitDeletes方法的实现也很简单,但是需要注意的是,这里的getOpts只会获取比当前会话版本号更低的key,然后去监控最新的key的删除。等这个key删除了,自己也就拿到锁了。

这种分布式锁的实现和我一开始的预想是不同的。它不存在锁的竞争,不存在重复的尝试加锁的操作。而是通过使用统一的前缀pfx来put,然后根据各自的版本号来排队获取锁。效率非常的高。

etcd 分布式锁

如图所示,共有4个session来加锁,那么根据revision来排队,获取锁的顺序为session2 -> session3 -> session1 -> session4。

当然,这里为什么可以通过revision来判定获取锁的顺序,就需要更深入的了解etcd的内部机制以及raft协议了。

分布式文件上传方案

-## 一、背景
考虑可扩展性,后台的服务肯定是要能够支持任意的扩展的,这样才能在业务量增长时通过增加机器的方式来应对。这对后台服务提出了一个要求,必须处理好分布式环境和单机环境的不同带来的问题。比如:在文件的分片上传这一场景下,应该负载均衡的问题,一个文件的多个分片请求会分布到不同的服务器上,这导致在将多个分片合并成完整文件时出现问题,而单机情况下则完全不会有这样的问题。

二、难点

这个问题的解决方案有很多种,但是需要根据实际情况尽量选择简洁、易部署和维护的方案进行,并且不能丢掉分布式系统的优点。比如网上的有的方案是使用单独的文件上传服务器,但是这就变成了单机服务了。也有使用NFS挂载的方案,即所有的服务器挂载一个相同的NFS目录,所有上传相关的文件都存放在挂载的目录下,这不仅给运维带来了麻烦,为了保证NFS的高可用,也带来了额外的运维成本。

三、解决方案

3.1 借助负载均衡

借助负载均衡的方案很简单,这是和应用无关的一种方法。即通过负载均衡这一层,将同一个文件的不同分片的请求全部导向到同一个服务器上。比如负载均衡这一块使用的是nginx, 通过url hash的方式来完成,可以使用如下的配置:

upstream backend {
    server 0.0.0.0:8080;
    server 0.0.0.0:8081;
    server 0.0.0.0:8082;
    hash request_uri;
}
server {
    location / {
        proxy_pass http://backend;
        proxy_set_header X-Real-IPremote_addr;
        proxy_set_header Host $host;
    }
}

然后写一个简单的服务来验证一下:

func post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    resp.WriteHeader(http.StatusOK)
    _, err := resp.Write([]byte("ok"))
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.Parse()

    http.HandleFunc("/Post", post)
    log.Println("listen ", addr)
    err := http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

我们启动三个服务:

./godemo -addr 0.0.0.0:8080
./godemo -addr 0.0.0.0:8081
./godemo -addr 0.0.0.0:8082

使用curl来post数据过来,请求的地址类似于: http://0.0.0.0/Post?123456?后面可以当成是文件的唯一标识码,比如文件和用户id的组合的md5信息等,这样对于同一个用户上传的同一个文件的不同分片请求,通过url哈希都会得到同样的结果,这样就会被转发到同一个后端服务器。

curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST

结果如下:

端口8080的服务收到了三条请求:
2019/09/26 13:58:52 identity: 123456, value: value
2019/09/26 13:58:56 identity: 123456, value: value
2019/09/26 13:59:03 identity: 123456, value: value

端口8081的服务收到了三条请求:
2019/09/26 13:59:40 identity: 789abc, value: vvvvv
2019/09/26 13:59:43 identity: 789abc, value: vvvvv
2019/09/26 13:59:44 identity: 789abc, value: vvvvv

如果在k8s集群中部署,使用nginx-ingress的话可以使用以下的部署方案:

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
    name: pipeline-ingress
    namespace: default
    annotations:
      nginx.ingress.kubernetes.io/proxy-body-size: "50m"
      nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"
spec:
    rules:
        - host: pipeline.dev.com
          http:
              paths:
                  - path: / 
                    backend:
                        serviceName: pipeline-service
                        servicePort: 8888

当然这种方式需要注意你的每个请求的URI都需要加上额外的参数(比如用户的userId的md5),这样才能均匀的分布到不同的服务器上。

3.2 后台程序自动proxy请求

这个方案的思路是集群中的每个服务实例都有单独的标识,文件分片在第一次上传时会返回给它一个该请求所属服务器的identity,之后所有的请求会带上这个identity,之后收到请求的服务器会检查这个identity是不是属于自己,如果不属于自己,则把这个请求转发给所属的服务器。

这个方案要求每台服务器都知道其他所有服务器的identity和地址。这里我们可以使用etcd这样的分布式数据库来存储。每台服务器在启动时都向etcd里注册自己的identity和address,之后服务器转发的时候都向etcd里面查找对应的address即可。

下面是一个示例的代码:

package main

import (
    "context"
    "encoding/json"
    "flag"
    "io/ioutil"
    "log"
    "net/http"
    "go.etcd.io/etcd/clientv3"
    "net/url"
    "time"
)

type Server struct {
    Etcd string
    Addr string
    Identify string
}

type ResponseObj struct {
    Identity string `json:"identity,omitempty"`
    Value string `json:"value"`
}

func getEtcdKV(etcd string) (clientv3.KV, error) {
    cfg := clientv3.Config{
        Endpoints:               []string{etcd},
        // set timeout per request to fail fast when the target endpoint is unavailable
        DialTimeout: time.Second,
    }

    cli, err := clientv3.New(cfg)

    if err != nil {
        return nil, err
    }

    return clientv3.NewKV(cli), nil
}

func httpProxy(anotherServer string, body map[string]string) (*http.Response, error) {
    formData := url.Values{}

    for k, v := range body {
        formData.Set(k ,v)
    }

    return http.PostForm(anotherServer, formData)
}

func (s *Server) Register() error {
    cli, err := getEtcdKV(s.Etcd)
    if err != nil {
        return err
    }

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
    _, err = cli.Put(ctx, s.Identify, "http://" + s.Addr)
    cancel()
    if err != nil {
        return err
    }

    return nil
}

func (s *Server) Post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    var data ResponseObj

    if identity != "" && identity != s.Identify {
        // proxy
        cli, err := getEtcdKV(s.Etcd)
        if err != nil {
            log.Println("get kv client error", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        etcdResp, err := cli.Get(context.Background(), identity)
        if err != nil {
            log.Println("get value error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        if len(etcdResp.Kvs) == 0 {
            log.Println("没有值")
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        proxyAddr := string(etcdResp.Kvs[0].Value)

        log.Println("proxy to: ", proxyAddr)

        text := map[string]string {
            "key": v,
            "identity": identity,
        }

        proxyResp, err := httpProxy(proxyAddr+"/Post", text)
        if err != nil || proxyResp.Body == nil {
            log.Println("http proxy error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        bodyData, err := ioutil.ReadAll(proxyResp.Body)
        if err != nil {
            log.Println("read proxy body error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        err = json.Unmarshal(bodyData, &data)
        if err != nil {
            log.Println("json unmarshal error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }
    } else {
        data.Identity = s.Identify
        data.Value = v
    }

    text, err := json.Marshal(data)
    if err != nil {
        log.Println("marshal json error: ", err)
        resp.WriteHeader(http.StatusInternalServerError)
        return
    }

    resp.WriteHeader(http.StatusOK)
    _, err = resp.Write(text)
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    var identity string
    var etcd string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.StringVar(&identity, "identity", "", "identify the server")
    flag.StringVar(&etcd, "etcd", "http://127.0.0.1:2379", "etcd server url")
    flag.Parse()

    if identity == "" {
        log.Fatal("identity不可为空")
    }

    var server = &Server{
        Addr:     addr,
        Identify: identity,
        Etcd: etcd,
    }

    err := server.Register()
    if err != nil {
        log.Fatal("register server error, check your etcd server: ", err)
    }

    http.HandleFunc("/Post", server.Post)
    log.Println("listen ", addr)
    err = http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

3.3 基于S3存储的方案

兼容S3的存储系统有一个对外的接口叫做: ComposeObject。这个接口可以将多个文件合并成一个文件存储到指定位置。如果我们的底层存储是基于兼容S3的系统,那么就可以利用这个接口轻松的实现。

方案的步骤可以描述成以下:

  1. 前端实现将文件分片,然后上传
  2. 上传的请求会因为前端负载均衡的原因分布到不同的服务器,每个上传请求都要带上这个文件、用户id、随机字符串组合的md5值,服务器收到请求后,只管将分片上传到以md5值为名的目录下,一旦上传成功,使用etcd或redis这样作为分片计数器+1.
  3. 分片总数等于总分片数时调用ComposeObject组合所有分片存储到指定位置即可。

ceph架构研究

这篇文章的绝大多数内容都是官网原文的翻译:Ceph Architecture

ceph是什么

ceph是一个分布式的文件存储系统,它提供了以下三种存储能力:

  • 块设备存储(block device)
  • 文件系统存储(filesystem)
  • 对象存储(object storage)

因此如果你需要多种存储方案的话,ceph是一个非常好的选择。这几种存储方案的差别是什么呢?

块设备存储是将底层的存储能力以逻辑硬盘的方式暴露给主机使用。主机只感知到单块物理硬盘的挂载,底层的复杂机制都被屏蔽

对象存储是将文件抽象成对象,然后通过网络接口(比如s3的api)来对文件进行操作(put,get,delete等)。

文件系统存储是像FTP,NFS这种,可以将ceph的存储能力当成普通的文件系统来使用,也支持cd, ls这样的文件系统命令。

ceph filesystem

ceph 架构一览

ceph arch

通过上图可以看出,ceph的底层核心是RADOS,然后通过LIBRADOSCEPHFS以及基于LIBRADOS的RADOSGW, RBD对外提供存储功能。

ceph 存储集群

ceph基于RADOS提供了无限的可扩展的存储集群。关于RADOS的知识可以看这篇论文: RADOS – A Scalable, Reliable Storage Service for Petabyte-scale Storage Clusters

一个Ceph存储集群包括两类常驻服务:

  • Ceph Monitor
  • Ceph OSD Daemon

ceph daemons

Ceph Monitor维护了一个关于集群信息映射的主副本。Ceph monitors集群保证了在一个monitor服务停止时的高可用性。存储集群客户端从Ceph Monitor获取一份集群信息映射的拷贝。

Ceph OSD服务负责检查自身的状态以及其他OSD的状态,然后汇报给monitors。

存储集群客户端和每一个Ceph OSD服务使用CRUSH算法来高效的计算数据位置的信息,而不是依赖于一个中心化的查找表。librados是一个很重要的角色,它提供了Ceph的高层特性,一系列的服务接口都是建设在librados上层的。

存储数据

Ceph存储集群从Ceph客户端接受数据,这里的客户端可以是Ceph块设备存储,对象存储,文件系统或者是基于librados的自定义实现。librados将数据作为一个对象存储。每一个对象都在文件系统中对应一个文件,这些文件存储在对象存储设备上。Ceph OSD服务在存储磁盘上处理这些读写操作。

osd handle read/write operations

Ceph OSD服务在一个扁平的命名空间中(没有目录结构)存储所有的数据,每个数据都作为一个对象。一个对象有唯一标识符,二进制数据,以及有一系列键值对的元数据。这完全取决于Ceph客户端的实现。举例来说,CephFS使用元数据来存储文件属性,比如文件拥有者,创建日期,最后修改日期等等。

ceph object

注意: 对象ID是在整个集群中唯一的,而不仅仅是本地文件系统。

扩展性和高可用

在传统架构中,客户端和一个中心化的组件(比如gateway, broker, API, facade等)通信。这个中心化组件在一个复杂系统中扮演了一个单点的入口。这在性能和扩展性上都导致了局限性————单点失败。(比如,如果中心组件宕机,整个集群都不可用)。

Ceph消灭了这个中心化的入口,让客户端直接和Ceph OSD服务通信。Ceph OSD服务在其他Ceph节点上创建对象副本来保证数据安全和高可用。Ceph同样使用一个monitor的集群来保证高可用。为了消灭中心化,Ceph使用了一个叫做CRUSH的算法。

CRUSH介绍

Ceph客户端和Ceph OSD服务都使用CRUSH算法来高效的计算对象存储位置信息,而不是必须取决于一个中心化的查找表。相比于之前的方案,CRUSH提供了一个更好的数据管理机制,通过清晰的将工作分布到集群中的所有客户端和OSD服务,可以达到很好的扩展性。CRUSH使用智能的数据复制来保证灵活性,这更适用于超大规模的存储。

集群映射表

Ceph依赖于Ceph客户端和OSD服务,它们包含了5个映射表。

  • 监控映射表: 包含集群的fsid, 位置, 名字地址以及每个monitor的端口。它同样记录了当前的代数,映射表的创建时间,最后一次修改时间。可以使用ceph mon dump来查看一个监控表

  • OSD映射表: 包含集群的fsid, 这个映射表的创建时间和最后修改时间,pools的列表,副本数量,PG数量,OSD的列表以及他们的状态。使用ceph osd dump来查看osd表。

  • PG映射表: 包含PG版本号, 时间戳,最新的OSD映射表的代数,全比率,每一个放置组的详情,比如PG ID, Up Set, Acting Set,以及PG的状态(比如active + clean),每一个pool的数据使用统计信息。

  • CRUSH映射表: 包含存储设备的列表,失败域的层级(比如device,host,rack,row,room等),存储数据时的遍历层级的规则。

  • MDS映射表: 包含当前的MDS映射表的代数, 映射表的创建时间,最后修改时间。它同样包含存储元数据的池,元数据服务器的列表,那些元数据服务器是up和in的。执行ceph fs dump来查看MDS映射表。

每一个映射表都包含它的操作状态改变的可迭代历史。Ceph Monitors维护一个集群映射表的主拷贝,包含了集群成员,状态,改变,以及Ceph存储集群的全局健康状态。

高可用的监控服务

在Ceph客户端可以读或写数据前,它们都必须和一个Ceph监控服务通信来获取最新的集群映射表的拷贝。一个Ceph存储集群可以只有一个Monitor角色。当然,这会导致单点失败。

为了增加可靠性和错误容忍,Ceph支持监控服务的集群。在一个监控服务的集群中,延迟和其他错误会导致一个或多个监控服务落后于集群的当前状态。因此,Ceph必须在各个监控服务实例之间就集群状态达成一致。Ceph总是相信大多数的监控服务。Paxos算法被用来就当前集群状态达成共识。

高可用的认证

为了鉴别用户以及防止中间人攻击,Ceph提供了cephx认证系统来认证用户和后台服务。

cephx协议不致力于传输过程中数据加密(比如SSL/TLS)或者是其他的加密。

Cephx使用共享的秘钥来认证,这意味着客户端和监控集群都有一份客户端秘钥的拷贝。认证协议需要双方都能证明给对方自己拥有这个秘钥的拷贝,但是又不能把秘钥原文说出来。这提供了两端的验证,意味着集群确信用户有这个秘钥,用户也确信集群有这个秘钥。

Ceph的一个关键的可扩展的特性是避免对Ceph object store的实现有中心化接口,这意味着Ceph客户端必须能够和OSD直接通信。为了保护数据,Ceph提供了cephx认证系统。cephx协议的机制和Kerberos类似。

用户调用Ceph客户端来和监控服务器通信。和Kerberos不同的是,每一个监控服务都可以验证用户以及分发私钥,所以在使用cephx时不再有单点失败或瓶颈了。监控服务返回一个认证数据结构,类似于Kerberos ticket,包含获取Ceph服务的会话秘钥。会话秘钥被用户永久秘钥加密过,因此只有用户可以向Ceph监控服务请求服务。之后客户端使用会话秘钥来请求它想要的服务,监控服务向客户端提供一个ticket,这个ticket可以向OSD认证客户端来处理数据。
Ceph Monitors和OSD共享秘钥,因此客户端可以使用监控服务提供的ticket和任意的OSD或元数据服务器通信。和Kerberos一样,cephx ticket会过期,因此攻击者不能使用过期的ticket或会话秘钥来获取服务。

为了使用cephx,管理员必须首先设置用户。在下图中,client.admin这个用户从命令行中调用ceph auth get-or-create-key来生成用户名和秘钥。Ceph的认证系统生成用户名和秘钥,存储一份拷贝给所有的监控服务,然后将用户秘钥传送给client.admin用户。这意味着客户端和监控服务共享同一个秘钥。

request create user

为了通过监控服务的认证,客户端把用户名发送给监控服务,监控服务生成一个会话钥匙,使用秘钥附加上用户名来加密。之后,监控服务把加密后的ticket返回给客户端。客户端使用共享的秘钥解密,获取到会话钥匙。会话钥匙为当前的会话提供认证。客户端之后请求由这个会话钥匙签名的ticket。监控服务生成ticket,使用用户秘钥加密并返回给客户端。客户端解密ticket,使用它来签名给OSD和元数据服务器的请求。

client to osd

cephx协议认证每个在客户端机器和Ceph服务器之间的会话。每一个在客户端和服务器之间发送的信息,都会接连通过初始化认证,使用ticket签名,这个签名可以由监控服务,OSD和元数据服务器用他们共享的秘钥来验证。

complete process

由这种认证提供的保护是在Ceph客户端和Ceph服务端之间的。在Ceph客户端之外这个认证是不管用的。如果用户通过远程连接到Ceph客户端上,Ceph的认证不会应用到这个远程连接上。

智能的后台服务造就了超大规模的可扩展性

在很多集群架构中,集群成员的主要目的是通过一个中心化的接口知道哪些节点可以访问。然后中心化的接口通过二次转发来提供服务,这回导致在pb级别向eb级别扩展时有很大的瓶颈。

Ceph消灭了这个瓶颈:Ceph的OSD服务和Ceph客户端是集群感知的。比如Ceph客户端、Ceph OSD服务都知道集群中的其他Ceph OSD服务。这使得Ceph OSD服务可以直接和其他Ceph OSD服务以及Ceph监控服务通信。另外,它使得Ceph客户端可以直接和Ceph OSD服务直接通信。

这种方法也最大化的利用了节点的CPU和RAM, 带来了以下几个主要的好处:

  • OSD 为客户端直接提供服务:因为任何网络设备都有最大并发连接的瓶颈,一个中心化的系统在高扩展的情况下会出现一个很低的物理瓶颈。通过让Ceph客户端直接和Ceph OSD服务通信, Ceph同时增加了性能和系统容量,并且解决了单点失败的问题。Ceph客户端可以维护一个和Ceph OSD服务的会话,而不是一个中心化的系统,只要它们需要这样做。

  • OSD成员和状态:Ceph OSD服务加入到集群中并且汇报它们的状态。在最低级别上,Ceph OSD服务状态是up或者down的,代表着它是否能够处理Ceph客户端的请求。如果Ceph OSD服务是down的,这意味着Ceph OSD服务的异常。如果Ceph OSD服务不在运行(crash了),Ceph OSD服务就不能通知Ceph监控服务它停止运行了。OSD服务周期性的给Ceph监控服务发送消息,如果Ceph监控服务在约定的周期内没有收到OSD服务的消息,它就把这个OSD服务标记为down。这种机制叫做failsafe(failsafe可以理解可以故障的自动保护装置,当OSD出现故障会自动将其排除从而避免引发更大的故障)。当然,OSD服务也会查看它相邻的OSD服务是否停止,然后汇报给Ceph监控服务。这保证了Ceph监控服务是一个轻量级的进程(脏活累活都被OSD服务做了)。

  • 数据清理:作为维护数据一致性和清洁度的一部分,Ceph OSD服务可以在PG(placement group, 放置组)中清理对象。Ceph OSD服务可以将一个PG中的对象和存储在另一个OSD服务中的副本进行元数据的对比。清理操作(通常是每天)会找出bug或文件系统的错误。Ceph OSD服务同样会执行更深层次的清理,通过对对象执行按位的对比。深层次的清理(通常是每周)会找到硬盘上坏的扇区。

  • 数据复制:像Ceph客户端一样,Ceph OSD服务使用CRUSH算法,但是Ceph OSD服务使用它来计算对象的副本应该存储在哪里(或者是重新负载均衡)。在一个典型的写操作场景下,一个客户端使用CRUSH算法来计算对象存储在哪里,将这个对象映射到pool和PG中,然后查看CRUSH映射表来鉴别PG组的主OSD服务

    客户端将对象写入到主OSD服务的PG组中。之后,主OSD携带着他自己的CRUSH映射表的拷贝存储到第二和第三个OSD服务中,这是为了多副本的备份。一旦它确认数据存储成功就返回给客户端。

osd write for replication

因为有这样的数据复制的能力,Ceph OSD服务使得Ceph客户端不需要负责这件事,同时也能保证高的数据可用性和数据安全。

动态集群管理

在高可用和可扩展性章节,我们解释了Ceph如何使用CRUSH、集群感知、智能服务扩展以及维护高可用。Ceph的关键设计就是自治、自我恢复、智能Ceph OSD服务。下面我们更深入的探究一下CRUSH如何在现代化的云存储架构中发挥作用,包括存储数据、集群的重新负载均衡以及动态的从错误中恢复。

关于pool

Ceph存储系统支持一个叫做pool的概念,这是存储对象的逻辑分区。

Ceph客户端从Ceph监控服务中获取集群映射表,然后向pool中写入对象。pool的大小或副本数量、CRUSH的规则以及PG的数量决定了Ceph如何存储数据。

how crush work

pool至少会设置以下参数:

  • 对象的拥有者/访问权限
  • PG的数量
  • 使用的CRUSH规则

映射PG到OSD

每一个pool都有一系列的PG。CRUSH动态的将PG映射到OSD中。当Ceph客户端存储对象,CRUSH将会映射每一个对象到一个PG中。

将对象映射到PG创建了一个中间层,这个中间层位于Ceph OSD服务和Ceph客户端中间。在动态的存储对象时,Ceph存储系统必须能够增长(或收缩)以及重新负载均衡。如果Ceph客户端知道哪个Ceph OSD服务拥有哪一个对象,这就导致Ceph客户端和Ceph OSD服务紧耦合了。相反的,CRUSH算法将每一个对象映射到PG,然后将每一个PG映射到一个或多个OSD服务。这个中间层允许在有新的OSD服务以及新的底层OSD设备上线时,Ceph可以动态重新负载均衡。下面的图表演示了CRUSH如何映射对象到PG,然后映射PG到OSD。

crush mapping

客户端在有集群映射表的拷贝和CRUSH算法的情况下,可以准确的计算出在读取或写入对象时使用哪一个OSD。

计算PG ID

当一个Ceph客户端绑定到一个Ceph监控服务时,它会获取集群映射表的最新拷贝。有了这个集群映射表,客户端知道所有的监控服务,OSD,元数据服务。然而,它并不知道任何关于对象的位置信息。

对象位置需要计算

客户端唯一需要的输入是对象ID和pool。这很简单:Ceph在命名的pool中存储数据。当客户端存储一个命名的对象时,它使用对象名、一个哈希码、pool中PG的数量和pool名来计算PG。Ceph客户端使用以下步骤来计算PG ID。

  • 1.客户端输入pool名和对象ID(比如: pool=”liverpool”, object-id=”john”)

  • 2.Ceph获取对象ID,然后做一下哈希操作

  • 3.Ceph计算哈希值除PG数量的余数,(比如58)得到了PG ID

  • 4.Ceph获取给定的pool名的pool ID(比如”liverpool”=4)

  • 5.Ceph将pool ID放在PG ID之前(比如4.58)

在一个忙碌的会话中,计算对象位置要比执行对象位置查询快很多。CRUSH算法允许客户端计算对象应该存在哪里,这使得客户端可以直接和主OSD服务通信,存储或获取对象。

peering and sets

在之前的章节中,我们提到Ceph OSD服务检查彼此的心跳,然后汇报给Ceph监控服务。Ceph OSD做的另外一件事叫做”窥探(peering)”,这会让所有存储PG的OSD对PG中的对象状态达成一致。事实上,Ceph OSD服务也会汇报”peering”的失败给Ceph的监控服务。Peering的问题通常会自我解决。

对状态达成一致并不意味着PG有最新的内容

Ceph存储集群被设置成一个对象至少有两个副本(比如, size=2),这是数据安全的最小要求。为了高可用,Ceph存储集群应该存储多于两个的副本(比如size=3并且最小size=2)。这样它就能在维护数据安全的时候以一个降级状态继续运行。

重新负载均衡

当你添加一个Ceph OSD服务到集群中,集群映射表会随之更新。根据之前的计算PG ID的方法,这会改变集群映射表。于是,它改变了对象的位置,因为它改变了计算的一个输入参数。下面的图标描述了重新负载均衡的过程。有一些但不是所有的PG会从已存在的OSD(OSD1和OSD2)中迁移到新的OSD(OSD3)中(这个过程虽然看似很简单粗暴,但其实对大型系统影响很小)。即使在重新负载均衡的过程中,CRUSH仍然是稳定可靠的。大多数PG保留它们原有的配置,每一个OSD获取到更多的可用容量,所以在重新负载均衡之后新的OSD没有负载峰值。

ceph rebalance

数据一致性

作为维护数据一致性和清洁度的一部分,Ceph OSD同样可以在PG中清理对象。关于OSD如何维持各个副本的对象一致性,可以参考上面的数据清理。

纠删码(erasure coding)

一个纠删码pool将每个对象分成K+M个块。数据被分为K个数据块和M个编码块。这个pool被配置成K+M的大小,因此每个块都可以存在一个OSD上。块的排序作为对象的属性存储起来。

举例来说,一个使用5个OSD(K+M=5)的纠删码pool可以容忍2块数据的丢失(M=2)。

读写编码块

当一个叫做NYAN的对象包含”ABCDEFGHI”的内容被写入到pool中,擦除编码功能将内容分成3个数据块:第一部分是ABC,第二部分是DEF,第三部分是GHI。如果内容长度不是K的整数倍,会被填充内容。这个功能同样创建两个编码块:第4个是YXY,第5个是QGC。每一个块都被存储到一个OSD中。这些块以同样的名字(NYAN)以对象的形式存储在不同的OSD上,块的创建顺序必须保留,存储对象的属性中(shard_t),作为名字的额外补充。块1包含ABC,存储在OSD5中,块4包含YXY存储在OSD3中。

erasure conding

当对象NYAN从纠删码pool中读取时,解码功能读取三个块: 块1包含ABC,块3包含GHI,块4包含YXY。然后它重建了对象的原始内容ABCDEFGHI。这个解码功能被通知块2和块5是缺失的(它们被称作’擦除’)。块5不会被读取是因为OSD4从集群中退出了。一旦3个块被读取解码功能就会被调用,而此时OSD2因为最慢所以根本没有被考虑进来。

erasure decode

中断完整的写入

在一个纠删码pool中,主OSD接受所有的写操作。它负责编码内容为K+M块,然后将它们发送给其他的OSD。它同样负责维护PG日志的版本号。

在下面的图表中,一个纠删码的PG被创建成K=2 M=1,它支持3个OSD,两个存储K,一个存储M,分别称为OSD1、OSD2、OSD3。一个对象被编码以及存储在OSD中,块D1v1(数据块序号1,版本1)在OSD1上,块D2v1在OSD2上,C1v1(编码块序号1,版本1)在OSD3上。每一个OSD上的PG日志都是唯一的(1,1 是epoch 1. version 1)

interrupted-fill-writes

OSD1是主服务,接受客户端的完整的写入,这意味着写入的数据要完整的替换对象而不是覆盖一部分。版本2(v2)的对象被创建来覆盖版本1(v1)。OSD1将数据编码到3块: D1v2在OSD1上,D2v2在OSD2上,C1v2在OSD3上。每一个块都被发送到目标OSD上。当OSD接受到消息要写入数据块时,它同样创建一个新的PG日志来反映这个改变。举例来说,只要OSD3存储了C1V2,它添加1,2到日志中。因为OSD是异步工作的,当其他块已经存储好了(比如C1v1和D1v1)一些数据块可能还在处理(比如D2v2)。

write replace

如果一切顺利,数据块都会被存储,日志中的last_complete指针会从1,1移动到1,2

full write success

最后,之前的版本都可以被移除了: OSD1上的D1v1,OSD2上的D2v1, OSD3上的C1v1

remove previous version

但是如果发生了异常,OSD1在D2v2仍然写入的时候停止了,对象的版本2只是部分写入:OSD3有一个块但是不足以恢复。它丢失了两个块: D1v2和D2v2,但是纠删码参数K=2,M=1要求至少2个块是可用的,才能恢复第三个块。OSD4成为新的主服务,找到last_complete日志是1,1,这应该是最新的日志了。

osd4 online

日志1,2被发现在OSD3上,这和在OSD4上存储的最新版本1,1不一样、因此1,2被取消,C1v2块被移除。D1v1块被解码功能重建出来存在OSD4上。

rebuild-on-osd4

缓存分层

缓存层提供给Ceph客户端更好的IO性能,因为一部分数据是存储在缓存中的。缓存层包括创建一个快速/昂贵的存储设备的pool,被配置成缓存层,便宜的存储设备被当成存储层使用。Ceph objecter负责处理将对象放在哪里,tiering代理决定什么时候将对象从缓存中写入到后面的存储层。因此缓存层和背后的存储层对Ceph客户端是完全透明的。

cache tiering

ceph协议

Ceph客户端使用本地的协议来和Ceph存储集群通信。Ceph将它的功能打包到librados库中,因此你可以创建自己的Ceph客户端。下面的图表描述了这种基础架构。

basic architecture

本地协议和librados

现在的应用程序需要一个支持异步通信的简单的对象存储接口。Ceph存储集群提供了这样的一个功能。接口提供了直接的,并行的对象访问功能。

  • Pool操作
  • 快照和写时复制克隆
  • 读写对象-创建和移除-整个对象或字节范围-追加或者截短
  • 创建/设置/获取/移除 XATTRS
  • 创建/设置/获取/移除 键值对
  • 复合操作和二次ack语义
  • 对象类

对象监视和通知

客户端可以注册一个对对象的持久关注,保持一个和主OSD的会话打开。客户端可以发送通知消息和一些负载信息给所有的监视者,然后所有监视客户端都会收到通知。这使得客户端可以使用任何对象作为同步通信通道。

watch and notify

数据分片

存储设备都会有吞吐量的限制,这对性能和可扩展性都有很大的影响。大多数存储系统都支持在多个存储设备上分别存储数据的一部分片段,这会增加吞吐量和性能。最常见的就是RAID了,Ceph的分片和RAID 0类似。

Ceph提供了三类客户端: Ceph块设备、Ceph文件系统、Ceph对象存储。Ceph客户端负责转换数据给用户(一个块设备镜像,RESTful对象,CephFS文件系统目录)。

提示: Ceph存储的对象不会被分片。Ceph对象存储,Ceph块设备,Ceph文件系统将它们的数据分片存储成多个Ceph存储系统对象。Ceph客户端通过librados写入数据必须执行分片(以及并行io)才能获取分片的优点。

最简单的Ceph分片格式包含一个对象的分片。Ceph客户端向存储对象写入分片单元,直到对象达到了它的最大容量,然后创建一个新的对象来存储多出来的数据分片。这种最简单的方式对于小的块设备镜像、s3或swift对象以及CephFS文件是足够的。当然,这种简单的方式没有最大化利用Ceph分布式存储数据的优点,串行的方式也没有提升性能。下面的图标描述了这种形式的分片:

simplest strip

如果你想要更大的镜像尺寸,更大的S3或Swift对象,或者更大的CephFS目录,你应该考虑通过将数据分片分布到一个对象集合中的多个对象上来提升读写性能。并行操作会显著的提升写入性能。因为对象会被映射到不同的PG中,然后被映射到不同的OSD中,每一个写入操作都会以最大的写入速度来并行操作。单硬盘的写入会因为柱头的移动和设备的最大带宽(100M/s)而受到限制(比如每次寻道都要6ms)。 通过将写入分布到多个对象上,Ceph可以减少每个硬盘的寻道时间,然后将多个硬盘的吞吐量结合以达到更快的写入(或读取)速度。

注意:分片是独立于对象复制的。因为CRUSH在OSD中复制对象,分片也会自动被复制。

在下图中,客户端数据被分片到一个对象集合中(对象集合1),这个集合包含4个对象,第一个分片单元是位于object 0 的 strip unit0,第4个分片单元是位于object3的strip unit3。当写如第4个分片后,客户端判断对象集合是否满了。如果对象集合没有满,客户端会从第一个对象继续写入分片。如果对象集合满了,客户端重新创建一个对象集合(object set2)。然后开始写入第一个分片(strip unit16)。

strip to multiple object

下面是三个重要的变量决定了Ceph如何对数据分片:

  • 对象大小: Ceph存储系统中的对象有一个配置的最大容量(比如2MB, 4MB)。这个对象大小应该足够大,以容纳很多个分片单元。

  • 分片宽度:分片有一个配置的单元大小(比如64kb)。Ceph客户端分割数据成同样大小的分片单元,除了最后一个。分片宽度应该是对象大小的一小部分,这样一个对象就可以包含多个分片单元了。

  • 分片数量:Ceph客户端在指定分片数量的对象上写入一连串的分片单元。这些指定数量的对象叫做一个对象集合。在Ceph客户端写完对象集合中的最后一个对象后,它又开始从对象集合中的第一个对象开始写入。

    重要:在投入到生产环境之前应该测试分片配置的性能。写入数据之后就不能修改这些配置了。

    一旦Ceph客户端将数据分片,然后将分片单元映射到对象中,Ceph的CRUSH算法将对象映射到PG中,然后PG被映射到Ceph OSD服务中。

    Ceph客户端

    Ceph客户端包含一系列的服务接口,包括:

    • 块设备:块设备(RBD)服务提供可变大小的、精简配置的块设备,块设备支持快照和复制。Ceph在整个集群中将块设备分片来提高性能。Ceph支持内核对象(KO)和QEMU的管理程序直接使用librbd——避免虚拟系统的内核对象过载。

    • 对象存储:对象存储(RGW)服务提供RESTful API,和Amazon S3以及OpenStack Swift兼容。

    • 文件系统:文件系统(CephFS)服务提供了POSIX兼容的文件系统,可以使用mount这样的命令,或者作为一个用户空间的文件系统。

Ceph可以运行OSD, MDS, 监控服务的额外实例,这些会提高可扩展性和高可用性。下面的图从高层架构上描述了这个情况:

high level architecture

Ceph对象存储

Ceph对象存储服务叫做radosgw,是一个FastCGi程序,提供了RESTful HTTP API来存储对象和元数据。它是在Ceph存储集群的顶层,拥有自己的数据格式,维护自己的用户数据库,认证和访问控制。RADOS入口使用统一的命名空间,这意味着你既可以使用OpenStack Swift兼容的API,也可以使用Amazon S3兼容的API。举例来说,你可以使用S3的API写入数据,使用Swift的API读取数据。

Ceph块设备

Ceph块设备将一个块设备镜像分片到多个Ceph存储集群的对象上,每一个对象都会映射到不同的PG,PG也会分布到不同的ceph-osd服务上。

精简配置的,可生成快照的Ceph块设备是虚拟化和云计算的吸引点。在虚拟机的场景下,人们可以在QEMU/KVM中使用rbd网络存储驱动来部署ceph块设备,主机使用librbd来给客户提供块设备服务。需要云计算栈使用libvirt来集成管理程序。你可以在QUME中使用精简配置的Ceph块设备,使用libvirt来支持OpenStack、CloudStack或其他解决方案。

因为我们目前不提供librbd支持其他的管理程序,你也同样可以使用Ceph块设备内核对象来提供一个块设备给一个客户端。其他的虚拟化技术,比如Xen可以访问Ceph块设备的内核对象。这可以使用命令行工具rbd完成。

Ceph文件系统

Ceph文件系统(CephFS)提供了POSIX兼容的文件系统,处于基于对象的Ceph存储集群之上。

ceph filesystem

Ceph文件系统服务包括Ceph元数据服务(MDS)。MDS的目的是存储所有的文件系统元数据(目录,文件拥有者,访问模式等等),元数据都是存储在内存中的。原因是MDS是的一般操作(ls, cd)会对Ceph OSD服务造成巨大的压力。因此将元数据和数据本身分离开是提供高性能服务的必要条件。

一些不错的文章

https://zhuanlan.zhihu.com/p/58888246