tensorflow-serving 在k8s中的模型部署方案

简介

tensorflow-serving是一个tensorflow模型部署的方案,其在设计时,就考虑了非常灵活的设计,比如:

  • 支持不同的文件系统,并且易扩展
  • 将模型发现、加载、使用和卸载和模型生命周期的管理,以及对外提供服务解耦合,因此非常容易扩展它的模型发现方式,以及同样可以支持其他框架下模型的整合。
  • 整个服务是无状态的,因此方便在k8s上进行部署

下图是 tensorflow serving 的整体架构:

tensorflow serving architecture

模型加载方式

tensorflow-serving支持从不同的地方,以不同的方式去加载模型。比如我们可以直接在启动tensorflow-serving时加上模型的地址,也可以提供模型配置文件来启动服务。

启动时加上参数:

tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=resnet --model_base_path=/home/jiang/data/yolov3

从配置文件中加载模型:

/etc/config/models.config

model_config_list {
    config {
        name: 'fashion'
        base_path: 's3://models/fashion/'
        model_platform: 'tensorflow'
    }
    config {
        name: 'resnet'
        base_path: 's3://models/resnet/'
        model_platform: 'tensorflow'
    }
}

执行以下命令来加载fashionresnet两个模型:

tensorflow_model_server --port=9000", "--rest_api_port=8500", "--model_config_file=/etc/config/models.config"

模型存储系统

tensorflow-serving的另一个特点就是支持从不同类型的存储系统中加载模型。比如本地的文件系统、s3、hdfs等等

从本地文件系统中加载

tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=resnet --model_base_path=/home/jiang/data/yolov3

从s3加载

从s3(兼容s3的对象存储系统都可以)中加载模型,需要配置一些环境变量

export AWS_ACCESS_KEY_ID=<key id>
export AWS_SECRET_ACCESS_KEY=<key>
export S3_ENDPOINT=minio-service.minio:9000
export S3_USE_HTTPS=0
export S3_VERIFY_SSL=0
export AWS_REGION=us-west-1
export S3_REGION=us-west-1
export AWS_LOG_LEVEL=3

然后通过以下命令启动服务即可

tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=resnet --model_base_path=s3://models/resnet/

从hdfs中加载

从hdfs中加载需要设置以下的环境变量

  • JAVA_HOME: Java 的安装路径

  • HADOOP_HDFS_HOME: HDFS 的安装路径,如果在LD_LIBRARY_PATH中设置了 libhdfs.so 的路径,那么这个环境变量可以不要。

  • LD_LIBRARY_PATH: 引入 libjvm.so 的路径。如果你的 HADOOP 发行版在 ${HADOOP_HDFS_HOME}/lib/native 这个目录下没有包含 libhdfs.so,也需要引入它。

export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:${JAVA_HOME}/jre/lib/amd64/server
  • CLASSPATH: 注意仅仅是设置 CLASSPATH 环境变量是不行的,需要用以下的方式使用:
CLASSPATH=$(${HADOOP_HDFS_HOME}/bin/hadoop classpath --glob) tensorflow_model_server --port=9000 --rest_api_port=8500 --model_name=yolov3 --model_base_path=hdfs://worknode2:9000/pipeline/models/yolov3

在k8s中部署

s3

tensorflow-serving 官方提供了docker镜像,因此使用 s3 的方式加载模型部署是很简单的。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: tfserving-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: tfserving
    spec:
      containers:
      - name: serving-container
        image: tensorflow/serving:1.14.0  
        ports:
        - containerPort: 8500
        - containerPort: 9000
        env:
        - name: AWS_ACCESS_KEY_ID
          value: J5WW5NKKV7AE9S0WZCM1
        - name: AWS_SECRET_ACCESS_KEY
          value: TbG0Y6nnUV8nQNLL9n4B3u3UPMMCJvqs2COx3and
        - name: S3_ENDPOINT
          value: minio-service.minio:9000
        - name: S3_USE_HTTPS
          value: "0"
        - name: S3_VERIFY_SSL
          value: "0"
        - name: AWS_REGION
          value: us-west-1
        - name: S3_REGION
          value: us-west-1
        - name: AWS_LOG_LEVEL
          value: "3"
        command: ["/usr/bin/tensorflow_model_server"]
        args: ["--port=9000", "--rest_api_port=8500", "--model_name=resnet", "--model_base_path=s3://models/resnet/"]

---
apiVersion: v1
kind: Service
metadata:
  labels:
    run: tf-service 
  name: tf-service
spec:
  ports:
  - name: rest-api-port
    port: 8500
    targetPort: 8500
  - name: grpc-port
    port: 9000
    targetPort: 9000
  selector:
    app: tfserving 
  type: NodePort

hdfs

在官方提供的 docker 镜像中,并没有打包 hdfs 的环境,因此我们需要自己构建一个镜像:

Dockerfile 所在目录如下:

hdfs_dockerfile
├── Dockerfile
└── hadoop-2.10.0
    ├── bin
    ├── etc
    ├── include
    ├── lib
    ├── libexec
    ├── LICENSE.txt
    ├── logs
    ├── NOTICE.txt
    ├── README.txt
    ├── sbin
    └── share

Dockerfile 如下:

FROM tensorflow/serving:1.14.0

RUN apt update && apt install -y openjdk-8-jre

COPY hadoop-2.10.0 /root/hadoop

ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
ENV HADOOP_HDFS_HOME /root/hadoop
ENV LD_LIBRARY_PATH ${LD_LIBRARY_PATH}:${JAVA_HOME}/jre/lib/amd64/server

RUN echo '#!/bin/bash \n\n\
CLASSPATH=$(${HADOOP_HDFS_HOME}/bin/hadoop classpath --glob) tensorflow_model_server --port=8500 --rest_api_port=9000 \
--model_name=${MODEL_NAME} --model_base_path=${MODEL_BASE_PATH}/${MODEL_NAME} \
"$@"' > /usr/bin/tf_serving_entrypoint.sh \
&& chmod +x /usr/bin/tf_serving_entrypoint.sh

EXPOSE 8500
EXPOSE 9000
ENTRYPOINT ["/usr/bin/tf_serving_entrypoint.sh"]

进行构建:

docker build -t tensorflow_serving:1.14-hadoop-2.10.0 .

运行:

docker run -p 9000:9000 --name tensorflow-serving -e MODEL_NAME=yolov3 -e MODEL_BASE_PATH=hdfs://192.168.50.166:9000/pipeline/models -t tensorflow_serving:1.14-hadoop-2.10.0

这样将上面的部署文件稍微修改一下即可使用。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: tfserving-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: tfserving
    spec:
      containers:
      - name: serving-container
        image: joyme/tensorflow_serving:1.14-hadoop-2.10.0
        ports:
        - containerPort: 8500
        - containerPort: 9000
        env:
        - name: MODEL_NAME
          value: yolov3
        - name: MODEL_BASE_PATH
          value: hdfs://192.168.50.166:9000/pipeline/models

---
apiVersion: v1
kind: Service
metadata:
  labels:
    run: tf-service 
  name: tf-service
spec:
  ports:
  - name: rest-api-port
    port: 8500
    targetPort: 8500
  - name: grpc-port
    port: 9000
    targetPort: 9000
  selector:
    app: tfserving 
  type: NodePort

模型调用

tensorflow-serving 支持两种方式调用模型进行预测: GRPC 和 RESTful api

GRPC的方式如下:

from __future__ import print_function

import grpc
import requests
import tensorflow as tf

from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc

IMAGE_URL = 'https://tensorflow.org/images/blogs/serving/cat.jpg'

tf.app.flags.DEFINE_string('server', '192.168.50.201:30806', 'PredictionService host:port')
tf.app.flags.DEFINE_string('image', '', 'path to image in jpeg format')
FLAGS = tf.app.flags.FLAGS

def main(_):
    if FLAGS.image:
        with open(FLAGS.image, 'rb') as f:
            data = f.read()
    else:
        dl_request = requests.get(IMAGE_URL, stream=True)
        dl_request.raise_for_status()
        data = dl_request.content

    channel = grpc.insecure_channel(FLAGS.server)
    stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

    # Send request
    request = predict_pb2.PredictRequest()
    request.model_spec.name = 'resnet'
    request.model_spec.signature_name = 'serving_default'
    request.inputs['image_bytes'].CopyFrom(
            tf.contrib.util.make_tensor_proto(data, shape=[1]))

    result = stub.Predict(request, 10.0) # 10 secs timeout
    print(result)

if __name__ == '__main__':
    tf.app.run()

RESTful API的方式如下

import requests
import json
import base64

with open("cat.jpg", "rb") as image_file:
    encoded_string = base64.b64encode(image_file.read())

headers = {"content-type": "application/json"}
body = {
        "instances": [
            {'b64': encoded_string}
           ]
        }
r = requests.post('http://192.168.50.201:32063/v1/models/resnet:predict', data = json.dumps(body), headers = headers)

print(r.text)

容器标准化

概述

我认为容器标准化可以分为两个角度去讲:

一个是容器的使用和镜像的格式需要规范,这叫做OCI(open container initiative),也就是说,不同技术实现的容器,都可以使用同一种方式运行,同一个镜像也可以在不同的容器技术上运行。

另外一个就是因为Kubernetes的流行,Kubernetes推出了一个CRI(container runtime interface)的接口规范,凡是直接或间接实现了这个接口规范的容器都可以作为Kubernetes的默认容器运行时。

OCI和CRI的制定也意味着容器技术迎来了高速发展。

CRI: container runtime interface

CRI是kubernetes推出的容器运行时接口,有了CRI,不论各种容器化技术是如何实现的,都可以用一个共同的接口对外提供服务。CRI中定义了容器和镜像的接口的接口,基于gRPC调用。具体的可以查看api.proto。下面简单的列一下:

// Runtime service defines the public APIs for remote container runtimes
service RuntimeService {
    // Version returns the runtime name, runtime version, and runtime API version.
    rpc Version(VersionRequest) returns (VersionResponse) {}

    // RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure
    // the sandbox is in the ready state on success.
    rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}
    // StopPodSandbox stops any running process that is part of the sandbox and
    // reclaims network resources (e.g., IP addresses) allocated to the sandbox.
    // If there are any running containers in the sandbox, they must be forcibly
    // terminated.
    // This call is idempotent, and must not return an error if all relevant
    // resources have already been reclaimed. kubelet will call StopPodSandbox
    // at least once before calling RemovePodSandbox. It will also attempt to
    // reclaim resources eagerly, as soon as a sandbox is not needed. Hence,
    // multiple StopPodSandbox calls are expected.
    rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}
    // RemovePodSandbox removes the sandbox. If there are any running containers
    // in the sandbox, they must be forcibly terminated and removed.
    // This call is idempotent, and must not return an error if the sandbox has
    // already been removed.
    rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}
    // PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not
    // present, returns an error.
    rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}
    // ListPodSandbox returns a list of PodSandboxes.
    rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}

    // CreateContainer creates a new container in specified PodSandbox
    rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}
    // StartContainer starts the container.
    rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}
    // StopContainer stops a running container with a grace period (i.e., timeout).
    // This call is idempotent, and must not return an error if the container has
    // already been stopped.
    // TODO: what must the runtime do after the grace period is reached?
    rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}
    // RemoveContainer removes the container. If the container is running, the
    // container must be forcibly removed.
    // This call is idempotent, and must not return an error if the container has
    // already been removed.
    rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}
    // ListContainers lists all containers by filters.
    rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}
    // ContainerStatus returns status of the container. If the container is not
    // present, returns an error.
    rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}
    // UpdateContainerResources updates ContainerConfig of the container.
    rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}
    // ReopenContainerLog asks runtime to reopen the stdout/stderr log file
    // for the container. This is often called after the log file has been
    // rotated. If the container is not running, container runtime can choose
    // to either create a new log file and return nil, or return an error.
    // Once it returns error, new container log file MUST NOT be created.
    rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}

    // ExecSync runs a command in a container synchronously.
    rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}
    // Exec prepares a streaming endpoint to execute a command in the container.
    rpc Exec(ExecRequest) returns (ExecResponse) {}
    // Attach prepares a streaming endpoint to attach to a running container.
    rpc Attach(AttachRequest) returns (AttachResponse) {}
    // PortForward prepares a streaming endpoint to forward ports from a PodSandbox.
    rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}

    // ContainerStats returns stats of the container. If the container does not
    // exist, the call returns an error.
    rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}
    // ListContainerStats returns stats of all running containers.
    rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}

    // UpdateRuntimeConfig updates the runtime configuration based on the given request.
    rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}

    // Status returns the status of the runtime.
    rpc Status(StatusRequest) returns (StatusResponse) {}
}

// ImageService defines the public APIs for managing images.
service ImageService {
    // ListImages lists existing images.
    rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}
    // ImageStatus returns the status of the image. If the image is not
    // present, returns a response with ImageStatusResponse.Image set to
    // nil.
    rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {}
    // PullImage pulls an image with authentication config.
    rpc PullImage(PullImageRequest) returns (PullImageResponse) {}
    // RemoveImage removes the image.
    // This call is idempotent, and must not return an error if the image has
    // already been removed.
    rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {}
    // ImageFSInfo returns information of the filesystem that is used to store images.
    rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}
}

共包含了两个服务:
– RuntimeService:容器和Sandbox运行时管理。
– ImageService:提供了从镜像仓库拉取、查看、和移除镜像的RPC。

再看一下CRI的架构图:

cri architecture

在kubernetes中,CRI扮演了kubelet和container runtime的通信桥梁。也因为CRI的存在,container runtime和kubelet解耦,就有了多种选择,比如: docker、 CRI-O、containerd、frakti等等。

OCI: open container initiative

这个是由docker和其他的公司推动的容器标准,为了围绕容器格式和运行时制定一个开放的工业化标准,目前主要有两个标准文档:容器运行时标准 (runtime spec)和 容器镜像标准(image spec)。这两个协议通过 OCI runtime filesytem bundle 的标准格式连接在一起,OCI 镜像可以通过工具转换成 bundle,然后 OCI 容器引擎能够识别这个 bundle 来运行容器

oci

下面引用一下其他博客的文字(https://www.jianshu.com/p/62e71584d1cb):

设计考量

操作标准化:容器的标准化操作包括使用标准容器创建、启动、停止容器,使用标准文件系统工具复制和创建容器快照,使用标准化网络工具进行下载和上传。

内容无关:内容无关指不管针对的具体容器内容是什么,容器标准操作执行后都能产生同样的效果。如容器可以用同样的方式上传、启动,不管是PHP应用还是MySQL数据库服务。

基础设施无关:无论是个人的笔记本电脑还是AWS S3,亦或是OpenStack,或者其它基础设施,都应该对支持容器的各项操作。
为自动化量身定制:制定容器统一标准,是的操作内容无关化、平台无关化的根本目的之一,就是为了可以使容器操作全平台自动化。

工业级交付:制定容器标准一大目标,就是使软件分发可以达到工业级交付成为现实

image spec(容器标准包)

OCI 容器镜像主要包括几块内容:

文件系统:以 layer 保存的文件系统,每个 layer 保存了和上层之间变化的部分,layer 应该保存哪些文件,怎么表示增加、修改和删除的文件等

config 文件:保存了文件系统的层级信息(每个层级的 hash 值,以及历史信息),以及容器运行时需要的一些信息(比如环境变量、工作目录、命令参数、mount 列表),指定了镜像在某个特定平台和系统的配置。比较接近我们使用 docker inspect 看到的内容

manifest 文件:镜像的 config 文件索引,有哪些 layer,额外的 annotation 信息,manifest 文件中保存了很多和当前平台有关的信息

index 文件:可选的文件,指向不同平台的 manifest 文件,这个文件能保证一个镜像可以跨平台使用,每个平台拥有不同的 manifest 文件,使用 index 作为索引

runtime spec(容器运行时和生命周期)

容器标准格式也要求容器把自身运行时的状态持久化到磁盘中,这样便于外部的其它工具对此信息使用和演绎。该运行时状态以JSON格式编码存储。推荐把运行时状态的JSON文件存储在临时文件系统中以便系统重启后会自动移除。

基于Linux内核的操作系统,该信息应该统一地存储在/run/opencontainer/containers目录,该目录结构下以容器ID命名的文件夹(/run/opencontainer/containers//state.json)中存放容器的状态信息并实时更新。有了这样默认的容器状态信息存储位置以后,外部的应用程序就可以在系统上简便地找到所有运行着的容器了。

state.json文件中包含的具体信息需要有:

版本信息:存放OCI标准的具体版本号。

容器ID:通常是一个哈希值,也可以是一个易读的字符串。在state.json文件中加入容器ID是为了便于之前提到的运行时hooks只需载入state.json就- – 可以定位到容器,然后检测state.json,发现文件不见了就认为容器关停,再执行相应预定义的脚本操作。

PID:容器中运行的首个进程在宿主机上的进程号。

容器文件目录:存放容器rootfs及相应配置的目录。外部程序只需读取state.json就可以定位到宿主机上的容器文件目录。

容器创建:创建包括文件系统、namespaces、cgroups、用户权限在内的各项内容。

容器进程的启动:运行容器进程,进程的可执行文件定义在的config.json中,args项。

容器暂停:容器实际上作为进程可以被外部程序关停(kill),然后容器标准规范应该包含对容器暂停信号的捕获,并做相应资源回收的处理,避免孤儿进程的出现。

CRI和OCI的对比

OCI是容器技术的开放性标准,而CRI是Kubernetes为了更方便的支持不同的容器技术,而推出的接口标准,与CRI类似的还有CNI和CSI,分别是网络和存储的接口。

可以看一下这张图:

kubelet cri runtime

kubelet有了CRI的接口,可以通过cri-containerd和containerd通信,也可以通过docker-shim和docker通信。注意这里的cri-containerd在containerd v1.2的时候就已经不再使用了,因为containerd本身就支持了CRI的规范。

同时kubernetes还孵化了cri-o这个项目,cri-o直接打通了cri和oci。runc和kata都是oci的具体实现。

所以,用一句话理解:实现了CRI就可以保证被kubernetes使用,实现了OCI就可以在各种设备上无差别的使用各种镜像。

docker、containerd和runc

containerd从docker中分出来的一部分。containerd是负责管理容器生命周期的常驻进程,而runc则是真正负责容器运行的部分。可以通过以下的图来看三者之间的关系:

docker-containerd-runc

containerd会调用多个runc实例来管理多个容器。docker engine则是提供接口给用户使用。

kubernetes当前支持的CRI后端

containerd

containerd的地址:https://github.com/containerd/containerd

先用官网的图片来看一下containerd的架构:

containerd architecture

containerd处于os和clients之间,它使用CRI API提供给Kubelet调用,使用containerd API提供给containerd client调用,使用Metrics API提供给Prometheus监控数据。然后有一层containerd Service Interfaces提供给上层api使用。注意到其中还有一个container-shim打通了Runtime managerOCI runtime的具体实现,比如runcrunhcskata

containerd实现了以下的特性:

  • OCI Image规范的支持
  • OCI Runtime规范的支持(通过runc等)
  • Image的上传和下载
  • 容器运行时和生命周期的支持
  • 创建、修改和删除网络
  • 管理网络命名空间以及将容器加入到现有的网络命名空间
  • 全部镜像的CAS存储的多租户模式支持

cri-o

项目地址:https://github.com/cri-o/cri-o

cri-o

cri-o项目是Kubernetes CRI接口的实现,同时可以兼容OCI标准的容器运行时。这样的能力就使得它可以作为Docker的轻量级的容器运行时的替代方案,使得Kubernetes可以接入符合OCI标准的所有容器运行时,同时也减少了容器开发者们的额外工作量(只需实现OCI标准即可)。

frakti

项目地址:https://github.com/kubernetes/frakti

frakti

frakti是Kubernetes官方推出的一个容器运行时,但是不同于docker这样的利于linux namespace的技术,它是基于虚拟化技术的容器,因此可以带来更好的环境隔离以及独享的内核。

rkt

项目地址:https://github.com/rkt/rkt/

rkt-vs-docker-process-model

rkt是coreos推出的和Docker抗衡的容器产品,不同于现在的Docker往更大更全的方向,不仅仅是容器功能,更集成了Swarm这样的集群方案,rkt注重的是作为运行在linux系统上的容器组件。上图可以看出Docker的架构要更加的复杂。

docker

官网地址: https://docker.com

docker作为Kubernetes的默认容器运行时,其本身在容器领域也占据了绝对的领导地位。

实现了OCI,可以通过cri-o接入kubernetes的项目

runc

项目地址: https://github.com/opencontainers/runc

opencontainers组织推出了OCI的规范,同时也开发了runc作为OCI规范的实现。runc是docker贡献出来的容器运行时,runc不仅是containerd的默认运行时,同时也可以接入到cri-o中。

Clear Containers

https://github.com/clearcontainers/runtime,项目已经不在维护,推荐迁移到Kata Containers

Kata Containers

https://github.com/kata-containers/runtime

Kata Containers和runc这种技术栈是不同的。runc使用的是linux namespace和cgroup来做环境隔离和资源限制,缺点在于使用的仍然是宿主机的内核,这样一旦受到了内核层的影响,会扩散到所有的容器。而Kata Containers使用的是虚拟化的技术,它实际上是一个虚拟机,但是可以像容器那样使用。

Kata Containers是2017年12月启动的项目,结合了Intel Clear Containers和 Hyper.sh RunV的优点,支持不同的主流架构,除x86_64外,还支持AMD64, ARM, IBM p-series and IBM z-series。

下图是kata Containers和传统容器技术的对比:

katacontainers_traditionalvskata_diagram

主要特点如下:

  • 安全性: 使用专用内核,提供了网络、IO和内存的独立,在虚拟化VT扩展的基础上利用硬件强制隔离
  • 性能: 提供与标准Linux容器一致的性能;提高隔离度,而无需增加标准虚拟机的性能。
  • 兼容性: 支持行业标准,包括OCI容器格式,Kubernetes CRI接口以及旧版虚拟化技术。
  • 简单: 消除了在完整的虚拟机内部嵌套容器的要求;标准接口使插入和入门变得容易

下图是Kata Containers的架构:

katacontainers_architecture_diagram

Kubernetes可以通过Hypervisor VSOCK Socket和容器交互。

gVisor

gVisor提供的是一个沙箱容器环境,可以说是传统容器技术和虚拟机容器技术的折中。它使用Go编写了一个可以作为普通非特权进程运行的内核,这个内核实现了大多数的系统调用。所以相比于namespace和cgroup实现的容器,它可以屏蔽掉容器内应用程序的内核调用。相比于虚拟机实现的容器,它更轻量级(作为系统的一个进程运行)。

gvisor

理解kubernetes service

理解service的角度

这篇文章不是关于如何使用kubernetes中的service,而是尝试整理我自己对service的看法,然后加深对service的理解。那么,我是从哪几个角度去看待service呢?

  • service是服务的稳定性保证
  • service是集群中的load balance
  • 通过无selector的service去理解VIP(虚拟ip)
  • service的设计,和不同实现方式的性能

service是服务的稳定性保证

在k8s集群中,无状态的pod副本是可以随时删除、随时创建的,并且重新创建的pod不再保留旧的pod的任何信息,包括ip地址。在这样的情况下,前端应用如何使用后端的这些pod来提供服务就成了问题,因此k8s实现了service这样一个抽象的概念。对于有selector的service,它在被创建的时候会自动创建endpoint资源,这个endpoint中包含了所有的pod的ip和端口,并且在之后的pod的删除、创建中,这个endpoint中会立即更新相关pod的ip和端口信息。同时,service的ip地址是永远固定的,service和endpoint是一一对应的关系。这样,如果前端应用通过固定的service ip来访问pod提供的服务,那么就可以在endpoint中找到一个可用的pod的ip和端口,然后通过一些操作(这个在后面会整理)将数据包转发到指定的pod上即可。

# 你可以通过kubectl查看service和endpoint来加深理解

$ kubectl -n h2o describe svc h2o

Name:              h2o
Namespace:         h2o
Labels:            app=h2o
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"h2o"},"name":"h2o","namespace":"h2o"},"spec":{"clusterIP...
Selector:          app=h2o
Type:              ClusterIP
IP:                None
Port:              web  54321/TCP
TargetPort:        54321/TCP
Endpoints:         10.42.1.33:54321,10.42.2.139:54321
Session Affinity:  None
Events:            <none>


$ kubectl -n h2o get endpoints h2o
NAME   ENDPOINTS                            AGE
h2o    10.42.1.33:54321,10.42.2.139:54321   4h45m

service通过ip地址的固定来保证服务的稳定性。那为啥service就是可以固定不变的呢?这是因为service本身就是一个抽象的概念啊,它不是一个正在运行的进程,只是一条数据,也正因为如此,它的ip地址和端口号也是不存在的,这些都是存储在etcd中的一条数据。那么k8s是如何通过这样一个虚假的ip和端口将请求转发到真实存在的pod中呢?这就是后面要说的内容了。

service是集群中的load balance

在上一节说到,一个service会对应一个endpoint,这个endpoint中会保存所有当前匹配到的pod的ip和端口号。那么现在有一个http请求过来了,发现endpoint中有三个待选的pod,那么我们使用一定的方式比较公平的选择出一个pod,就轻松的达到了负载均衡的效果。

service load balance

那么k8s中,load balance的策略是什么样的呢?因为不同的service实现方式使用的方法不同,这个内容会在后面整理。

通过无selector的service去理解VIP(虚拟ip)

在前面的内容中,service一直和endpoint、pod关联在一起,那么如果我们的service没有selector,就不会创建endpoint了,也不会关联pod。前面也提到了service是一个抽象的概念,其拥有的ip和port都是假的。其实这个叫做VIP(virtual ip)。那么,如何通过无selector的service来理解VIP呢?

在k8s中创建无selector service的时候,不会自动创建关联的endpoint,更不会去匹配pod了。但是这样的service仍然是拥有ip和port的。我们可以尝试一下:

svc-without-selector.yaml

apiVersion: v1
kind: Service
metadata:
  name: my-service
spec:
  ports:
    - protocol: TCP
      port: 8081
      targetPort: 8081
kubectl apply -f svc-without-selector.yaml

查看一下这个svc的详情:

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         <none>
Session Affinity:  None
Events:            <none>

除了拥有ip和端口号,就什么都没有了。这就是说service为什么就是一条数据的原因,10.43.12.208也就是一个VIP。

对于无selector的service还有一个用处,就是让集群内部的应用可以稳定的访问到集群外部的服务。因为service是稳定的,那么集群内部都可以访问这个service,然后让这个service将请求转发到集群外。

这里我们可以手动创建一个endpoint,这个endpoint包含了集群外的两个http服务

apiVersion: v1
kind: Endpoints
metadata:
  name: my-service
subsets:
  - addresses:
      - ip: 192.168.50.99
      - ip: 192.168.50.201
    ports:
      - port: 8081

然后我们先检查一下service,发现endpoints已经更新了。

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         192.168.50.201:8081,192.168.50.99:8081
Session Affinity:  None
Events:            <none>

我们在集群内部访问一下(使用kubectl exec到一个pod上):

$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1
$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1

service的设计,和不同实现方式的性能

service的设计是以提高性能为前提不断的演进的,这里是关于Service的设计讨论: DESIGN: Services v2。感兴趣的还可以看看k8s-release-v1.0的时候对service的描述: Service

service的设计中有4个角色: Pod、 Service、Ambassador、Portal

  • Pod: k8s集群中的最小调度单位,包含一个或多个容器
  • Service: 一组pod的集合,由标签选择器来关联
  • Ambassador: 中文翻译是大使,是一段可执行的逻辑,它负责实现客户端访问Service,然后将请求转发到一个对应的Pod上。这个Ambassador可以是一个云服务商的服务,也可以是一个单独的pod(比如haproxy),或者是每个节点都有的共享进程(kube-proxy)。
  • Portal: 固定的ip:port对,客户端只要访问这个Portal,请求自然会被转发到Ambassador上,客户端不需要理解Ambassador的具体实现。

最初的设计中是有三种方案,

方案一: 每个服务一个ip,共享的Ambassador。这个ip就是上面说的Portal ip。将服务以及ip、端口广播给所有的kube-proxy实例。kube-proxy设置好iptables来“窃取”所有到Portal(ip,port)的请求,然后将这个请求转发到自己的某个端口上。这里kube-proxy扮演的是Ambassador角色,它会使用round-robin的方法来把请求均衡的分发到Service后面的Pod上。这个方案里,有以下的优点和缺点:

优点:
– 不会有端口冲突
– Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
– iptables可以放在root namespace,即使pods重启了也不需要更新iptables(这是因为iptables是负责将到service ip:port的流量转发到kube-proxy的一个端口上即可)。
– 不需要在pod上预先声明需要的Service。

缺点:

  • kube-proxy是多租户的(需要为所有的service做流量转发)
  • 从kube-proxy转发的流量的源ip不是真实的源ip,
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 当service数量级上千后可扩展性不高

方案二: 每个服务一个ip,私有的Ambassador。对每个pod来说,都有一个私有的的ambassador,这要求pod需要先声明它们想先访问那个服务(否则的话,对于集群中的每次Service的添加和删除,都需要kubelet或其他的root-namespace、true-root的用户代理变动到每个pod的namespace下。[iptables规则需要root用户]),这样才能在pod的命名空间下建立iptables规则。

优点:

  • 不会有端口冲突
  • Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
  • 代理不是多租户的
  • 从kube-proxy转发的流量的源ip是真实的源ip,
  • 容易从方案一迁移
  • 需要pod预先声明服务(结构良好)

缺点:

  • iptables是配置在pod的namespace下,但是pod的命名空间重启了就必须重新运行一次
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 需要pod预先声明服务(目前还没实现)

方案三:localhost的portal,私有的ambassador

不同于给service分配ip,而是使用本地的端口号作为portal。

介绍完这三种方案后,就可以引入service最终的演进了: userspace->iptables->ipvs。

这里先放一张iptables的工作流程图,方便理解:

iptables

userspace模式

这里的userspace就是方案一的实现,在k8s 1.0的发布中正式启用。userspace的工作原理图如下:

userspace service overview

这种模式,kube-proxy 会监视 Kubernetes master 对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会在本地 Node 上打开一个端口(随机选择)。 任何连接到“代理端口”的请求,都会被代理到 Service 的backend Pods 中的某个上面(如 Endpoints 所报告的一样)。 使用哪个 backend Pod,是 kube-proxy 基于 SessionAffinity 来确定的。

最后,它安装 iptables 规则,捕获到达该 Service 的 clusterIP(是虚拟 IP)和 Port 的请求,并重定向到代理端口,代理端口再代理请求到 backend Pod。默认情况下,用户空间模式下的kube-proxy通过round-robin选择后端。

这里有一个问题在于,client访问service的clusterIP时,iptables会把流量转发到kube-proxy的某个端口上,这样的话,每次转发都有一个内核态用户态的转换。

iptables模式

iptables service overview

这种模式,kube-proxy 会监视 Kubernetes 控制节点对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会安装 iptables 规则,从而捕获到达该 Service 的 clusterIP 和端口的请求,进而将请求重定向到 Service 的一组 backend 中的某个上面。 对于每个 Endpoints 对象,它也会安装 iptables 规则,这个规则会选择一个 backend 组合。

默认的策略是,kube-proxy 在 iptables 模式下随机选择一个 backend。类似于这样

iptables -t nat -A PREROUTING -p tcp -d 15.45.23.67 --dport 80 -j DNAT --to-destination 192.168.1.1-192.168.1.10

使用 iptables 处理流量具有较低的系统开销,因为流量由 Linux netfilter 处理,而无需在用户空间和内核空间之间切换。 这种方法也可能更可靠。

如果 kube-proxy 在 iptable s模式下运行,并且所选的第一个 Pod 没有响应,则连接失败。 这与用户空间模式不同:在这种情况下,kube-proxy 将检测到与第一个 Pod 的连接已失败,并会自动使用其他后端 Pod 重试。

您可以使用 Pod readiness 探测器 验证后端 Pod 可以正常工作,以便 iptables 模式下的 kube-proxy 仅看到测试正常的后端。 这样做意味着您避免将流量通过 kube-proxy 发送到已知已失败的Pod。

ipvs模式

ipvs是在Kubernetes v1.11正式可用的。ipvs也是依赖于iptables的,但是它的性能更高。

ipvs service overview

在ipvs模式下,kube-proxy监视Kubernetes服务和端点,调用netlink接口相应地创建IPVS规则,并定期将IPVS规则与Kubernetes服务和端点同步。该控制循环可确保IPVS状态与所需状态匹配。访问服务时,IPVS 将流量定向到后端Pod之一。

IPVS代理模式基于类似于iptables模式的netfilter挂钩函数,但是使用哈希表作为基础数据结构,并且在内核空间中工作。 这意味着,与iptables模式下的 kube-proxy 相比,IPVS 模式下的 kube-proxy 重定向通信的延迟要短,并且在同步代理规则时具有更好的性能。与其他代理模式相比,IPVS 模式还支持更高的网络流量吞吐量。

IPVS提供了更多选项来平衡后端Pod的流量。 这些是:

  • rr: round-robin
  • lc: least connection (smallest number of open connections)
  • dh: destination hashing
  • sh: source hashing
  • sed: shortest expected delay
  • nq: never queue

注意:
要在IPVS模式下运行kube-proxy,必须在启动kube-proxy之前使IPVS Linux在节点上可用。

当 kube-proxy 以 IPVS 代理模式启动时,它将验证 IPVS 内核模块是否可用。 如果未检测到 IPVS 内核模块,则 kube-proxy 将退回到以 iptables 代理模式运行。

ipvs在同步规则、网络带宽、cpu/内存消耗上都明显优于iptables,关于具体的性能数据可以看这篇文章: 华为云在 K8S 大规模场景下的 Service 性能优化实践。ipvs的详细介绍可以看这篇文章:ipvs 基本介绍。ipvs和iptables的对比:kube-proxy 模式对比:iptables 还是 IPVS?

rook ceph的rgw崩溃问题排查

问题

在开发可视化机器学习平台时,集成的FastRCNN实验一直跑不到结束就会出错。有时候是在下载基础模型以及代码包时出错,有时候在train结束后向predict传递artifacts出错。

过程

首先这个问题出现在局域网内,处于开发环境,因此ceph没有做高可用的部署。其次,ceph是用rook这个项目部署在k8s集群中的。

最后,在使用argo做机器学习的资源调度时,会出现大的数据资源下载和转移出现错误。具体表现为:大量数据下载会出现connectiion refused,日志如下:

2019-10-25 02:35:24 (20.1 MB/s) - Connection closed at byte 528482304. Retrying.
--2019-10-25 02:35:25--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/tho6wHm0UmZeZYbfWBv5lkOZ576838763
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

大量数据上传时也会中断,导致argo无法调用下一步:

日志如下:

NAME            custom-workflow-43-6rhlb.api-train-faster-1699
TYPE            Pod
PHASE           Error
MESSAGE         failed to save outputs: timed out waiting for the condition
START TIME      2019-10-24T06:15:56Z
END TIME        2019-10-24T06:30:34Z
DURATION        14:38 min

这里可能是网络问题,argo的问题或者是ceph的问题。但是当数据量不大的时候不会出现错误。因此检查ceph是否正常

~ » kubectl -n rook-ceph get pods                                                
NAME                                           READY   STATUS      RESTARTS   AGE
csi-cephfsplugin-964zm                         3/3     Running     27         46d
csi-cephfsplugin-dxnbg                         3/3     Running     12         46d
csi-cephfsplugin-provisioner-b66d48bc8-fglq9   4/4     Running     0          12d
csi-cephfsplugin-provisioner-b66d48bc8-x67pd   4/4     Running     0          12d
csi-rbdplugin-5fs2x                            3/3     Running     27         46d
csi-rbdplugin-bddlt                            3/3     Running     12         46d
csi-rbdplugin-provisioner-95dd85d6-7kc4c       5/5     Running     0          12d
csi-rbdplugin-provisioner-95dd85d6-mpjtj       5/5     Running     0          12d
rook-ceph-agent-fs4xq                          1/1     Running     9          46d
rook-ceph-agent-wx6r4                          1/1     Running     4          46d
rook-ceph-mds-myfs-a-774974c8c4-xt2ls          1/1     Running     0          12d
rook-ceph-mds-myfs-b-748d7d7f7d-wftt5          1/1     Running     0          12d
rook-ceph-mgr-a-5f54d44c98-57qcb               1/1     Running     0          12d
rook-ceph-mon-a-6f9fbfc99d-lmb6c               1/1     Running     0          17d
rook-ceph-operator-6f556bcbff-glvt6            1/1     Running     0          12d
rook-ceph-osd-0-7c489dc87b-wkt7x               1/1     Running     0          17d
rook-ceph-osd-1-86cc67cc45-25h4q               1/1     Running     0          12d
rook-ceph-osd-prepare-worknode1-xnmpw          0/1     Completed   0          12d
rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm      1/1     Running     65         12d
rook-ceph-tools-5f5dc75fd5-52jbj               1/1     Running     0          12d
rook-discover-g95ws                            1/1     Running     6          46d
rook-discover-vs5fs                            1/1     Running     13         46d

发现ceph rgw重启了65次,这个肯定是不正常的。查看ceph rgw的日志:

$ kubectl -n rook-ceph logs -p rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm

# 截取了一部分日志
debug 2019-10-25 02:35:13.473 7f4880b98700  1 ====== starting new request req=0x55aa678c48e0 =====
debug 2019-10-25 02:35:14.549 7f4880b98700  0 ERROR: client_io->complete_request() returned Broken pipe
debug 2019-10-25 02:35:14.549 7f4880b98700  1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== starting new request req=0x55aa54a488e0 =====
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== req done req=0x55aa54a488e0 op status=0 http_status=404 latency=0s ======

在我的理解中broken pipe一般出现在向已关闭的连接中写入数据时,会出现这个问题。但是通过日志可以发现,出现broken pipe的错误之后,rgw仍然是在处理请求的,但是部分请求的latency很高。因此这里的broken pipe是表示着rgw开始出现一些异常情况,但不是pod重启的直接原因。

真正导致rgw被杀死的原因是因为rgw进程收到了sigterm信号,然后进程被杀死。

debug 2019-10-25 02:35:24.525 7f494c52f700 -1 received  signal: Terminated from Kernel ( Could be generated by pthread_kill(), raise(), abort(), alarm() ) UID: 0
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm set alarm for 120
debug 2019-10-25 02:35:24.525 7f4962116780 -1 shutting down
debug 2019-10-25 02:35:24.629 7f488cbb0700  0 iterate_obj() failed with -9

使用kubectl describe查看pod的event:

Events:
  Type     Reason     Age                  From                Message
  ----     ------     ----                 ----                -------
  Normal   Killing    15m (x65 over 12d)   kubelet, worknode1  Container rgw failed liveness probe, will be restarted
  Warning  Unhealthy  15m (x249 over 12d)  kubelet, worknode1  Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers)
  Normal   Pulled     15m (x66 over 12d)   kubelet, worknode1  Container image "ceph/ceph:v14.2.2-20190826" already present on machine
  Normal   Created    15m (x66 over 12d)   kubelet, worknode1  Created container rgw
  Normal   Started    15m (x66 over 12d)   kubelet, worknode1  Started container rgw

这里才是真正的重启原因,kubelet检查pod是否存活,但是请求超时了。意味pods出现的故障,因此杀死了pod并重启。

pod的liveness设置:

Liveness:       http-get http://:80/ delay=10s timeout=1s period=10s #success=1 #failure=3

k8s的liveness机制是检查pod中应用程序存活状态并在出错后自动重启的一种机制。提供了三种方式:

  • 在容器内执行命令,如果执行成功,则表示容器是存活并且健康的。否则就重启容器使得应用程序恢复正常。
  • 使用http请求检查,如果返回的状态码是200则表示正常,否则表示失败。
  • 使用tcp连接检查,如果kubelet可以打开指定端口的socket连接,则表示正常,否则表示失败。

在这个场景下出现Warning Unhealthy 15m (x249 over 12d) kubelet, worknode1 Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers),表示kubelet使用http get检查pod的80端口,但是这个请求却超时了。因此杀死了容器并重启,导致大文件(700MB以上)上传/下载失败。

这里kubelet检查的是http://10.42.2.44:80这个地址,我们回过头看一下argo那边的报错信息,Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.。都是80端口,当然这里千万不能被ip地址误导了,10.42.2.44是pod的ip地址,10.43.126.166是service的ip地址,我们可以验证一下:

~ » kubectl -n rook-ceph get svc                                                 
NAME                              TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
rook-ceph-rgw-my-store            ClusterIP   10.43.126.166   <none>        80/TCP              46d

然后再结合之前debug 2019-10-25 02:35:14.549 7f4880b98700 1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======这条日志,latency已经超过了1s,而kubelet的liveness超时时间是1s。

现在基本可以得出以下异常流程:

  1. 因为某些原因,导致rgw出现broken pipe的出错,并且部分请求的lantency时间大大提高。
  2. kubelet周期性的对rgw做liveness的检查,并且检查的http就是rgw的80端口,这个端口因为上面的原因导致lantency超过了1s,而liveness检查的timeout只有1s。因此kubelet认为该pod不健康,选择重启。
  3. kubelet向rgw发送了sigterm信号,rgw关闭进程,pod重启。

猜测可能造成这个问题的原因:

  1. ceph所在机器的性能不够,导致响应请求出现问题。
  2. 局域网的网络问题,因为内部的最高带宽只有10MB/s,但是局域网内的设备很多,网络这部分导致了瓶颈。

关于机器性能的问题,我认为是可以排除的,因为机器性能本身很好,并且开发环境几乎没有请求量,接下来就是验证是因为网络问题导致瓶颈,造成部分接口延迟过高被杀死。

为了验证这个猜想,假设这里有三台机器A,B,C,组成了一个k8s集群,ceph是部署在k8s之上的。在A之上,我用dd命令产生一个40G的大文件,然后在B之上使用wget下载。然后在C上面观察ping的延迟是否上升。

A:

$ dd if=/dev/zero of=test bs=1M count=0 seek=40000

$ python -m SimpleHTTPServer 8999

B:

$ wget http://192.168.50.37:8999/test

--2019-10-25 14:18:30--  http://192.168.50.37:8999/test
正在连接 192.168.50.37:8999... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 41943040000 (39G) [application/octet-stream]
正在保存至: “test”

test      3%[==>            ]   1.54G  11.1MB/s    剩余 73m 5s

C:

$ ping 192.168.50.37
PING 192.168.50.37 (192.168.50.37) 56(84) bytes of data.
64 bytes from 192.168.50.37: icmp_seq=1 ttl=64 time=0.384 ms
64 bytes from 192.168.50.37: icmp_seq=2 ttl=64 time=0.373 ms
64 bytes from 192.168.50.37: icmp_seq=3 ttl=64 time=0.336 ms
64 bytes from 192.168.50.37: icmp_seq=4 ttl=64 time=4.90 ms
64 bytes from 192.168.50.37: icmp_seq=5 ttl=64 time=1.18 ms
64 bytes from 192.168.50.37: icmp_seq=6 ttl=64 time=7.74 ms
64 bytes from 192.168.50.37: icmp_seq=7 ttl=64 time=3.51 ms
64 bytes from 192.168.50.37: icmp_seq=8 ttl=64 time=6.66 ms
64 bytes from 192.168.50.37: icmp_seq=9 ttl=64 time=6.31 ms

网络延迟增加的还是很明显的。

这时候使用argo开始一个新的机器学习的实验,但是这个实验的数据量较小,在之前的使用中都没有问题。结果确实出现了问题:

2019-10-25 06:21:04 (8.10 MB/s) - Connection closed at byte 136314880. Retrying.
--2019-10-25 06:21:05--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/maC8Om6Y3QbSJxSTT9x82rp8908272441
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

那么为了有对照实验,将下载关闭,重新做这个机器学习的实验,结果正常。

解决方法

因为缺少了对ceph这块源代码的研究,上面的结论并不一定正确。但是可以大概得出如何解决,可以先尝试将liveness检测的timeout时间增加。

kubectl -n rook-ceph edit deployment rook-ceph-rgw-my-store-a

把liveness的timeout时间调成5s,这样就解决了这个问题。

理解go context

理解context

在我刚接触context包时,我是有一点迷惑的。因为在其他的编程语言中很少有接触到context包类似的用法。比如在js绘制canvas中的context,也只是作为保留上下文操作来用的。在go语言的context包中,同样也可以当成上下文来理解,但是在看待context提供的能力时,要从以下两点来理解:

  • context提供了一种管理多个goroutine的机制。
  • context最终形成了一种树形结构。

在深入之前,让我们回忆一下多线程/进程模型中,主线程/进程是如何管理子线程/进程的。如果子线程/进程又派生了其他的线程/进程呢?这一定是一个头疼的问题。

在go语言中,协程也面临了同样的问题。因此官方在go1.7版本中引入了context包。那么context提供了什么样的能力来管理协程呢?先看一个withCancel的简单的例子

func watch(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            log.Println("退出")
            return

        default:
            log.Println("执行逻辑")
            time.Sleep(2 * time.Second)
        }
    }
}

func withCancel() {
    ctx, cancel := context.WithCancel(context.Background())

    go watch(ctx)
    go watch(ctx)
    go watch(ctx)

    time.Sleep(6 * time.Second)
    fmt.Println("可以了,通知子协程停止")
    cancel()
    //为了检测子协程是否停止,如果没有输出,就表示停止了
    time.Sleep(5 * time.Second)
}

调用withCancel的输出如下:

2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
可以了,通知子协程停止
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出

可以看到,我们通过context.WithCancel方法生成了一个ctx和一个cancel,然后主动调用cancel就可以通过所有的子协程退出了。在watch方法的实现中,我们是通过select机制来实现的,一旦context的Done()方法有值,就会调用return退出,否则的话就执行default中我们的业务逻辑。

这样我们就可以随时通知所有的子协程退出了。在上面说到,context最终形成了一种树形结构,是因为在子协程中也可以继续使用新的协程,这样就形成了一个树形的调用了。

协程的树形结构

我们在1中使用cancel方法,就可以向下传播,在2~10号协程中全部退出。

简单的了解context包的使用后,可以看一下context.Context这个接口,为了简洁,我删除了源代码中的注释。

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

Context接口总共提供了4个方法。

  • Deadline()用来获取当前context的取消时间,第二个返回值ok等于false的时候,表示没有设置。
  • Done()方法返回了一个chan,当chan中读取到值的时候,表示父context已经发起了取消的请求,那么当前协程开始做相关的清理工作然后退出。
  • Err()返回context的取消原因
  • Value()方法用来通过一个key获取当前Context上与之对应的值。

理解Context的树形结构

go中大量的库都使用了context机制,比如database/sql库,net/http库等等,因为这些库都支持了context,使得我们在程序中很容易通过context来管理所有新建的协程,而不用自己实现复杂的机制来管理。一旦我们需要取消,只需要在root context调用cancel方法即可。

一些基本使用

在上面的例子中,我们使用了withCancel来实例化一个可以手动取消的context。context包中同样提供了一些其他的方法。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithDeadline可以设置截止时间。会到达指定时间时自动取消。当然也可以调用CancelFunc来手动取消。

WithTimeout可以设置在一段时间后自动取消,和WithDeadline类似。

参考

Go语言实战笔记(二十)
Golang Context深入理解
Go Concurrency Patterns: context

etcd分布式锁的实现方式

一、概述

在etcd的clientv3包中,实现了分布式锁。使用起来和mutex是类似的,为了了解其中的工作机制,这里简要的做一下总结。

二、使用方式

etcd分布式锁的实现在go.etcd.io/etcd/clientv3/concurrency包中,主要提供了以下几个方法:

  • func NewMutex(s *Session, pfx string) *Mutex, 用来新建一个mutex
  • func (m *Mutex) Lock(ctx context.Context) error,它会阻塞直到拿到了锁,并且支持通过context来取消获取锁。
  • func (m *Mutex) Unlock(ctx context.Context) error,解锁

因此在使用etcd提供的分布式锁式非常简单,通常就是实例化一个mutex,然后尝试抢占锁,之后进行业务处理,最后解锁即可。

一个简单的例子如下:

package main

import (
    "context"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "sync"
    "time"
)

var n = 0

// 使用worker模拟锁的抢占
func worker(key string) error {
    endpoints := []string{"127.0.0.1:2379"}

    cfg := clientv3.Config{
        Endpoints:            endpoints,
        DialTimeout:          3 * time.Second,
    }

    cli, err := clientv3.New(cfg)
    if err != nil {
        log.Println("new cli error:", err)
        return err
    }

    sess, err := concurrency.NewSession(cli)
    if err != nil {
        return err
    }

    m := concurrency.NewMutex(sess, "/"+key)

    err = m.Lock(context.TODO())
    if err != nil {
        log.Println("lock error:", err)
        return err
    }

    defer func() {
        err = m.Unlock(context.TODO())
        if err != nil {
            log.Println("unlock error:", err)
        }
    }()

    log.Println("get lock: ", n)
    n++
    time.Sleep(time.Second) // 模拟执行代码


    return nil
}

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()


    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    wg.Wait()
}

三、实现机制

Lock()函数的实现很简单。这里可以贴出来看一下:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先通过一个事务来尝试加锁,这个事务主要包含了4个操作: cmpputgetgetOwner。需要注意的是,key是由pfxLease()组成的。

  • cmp: 比较加锁的key的修订版本是否是0。如果是0就代表这个锁不存在。
  • put: 向加锁的key中存储一个空值,这个操作就是一个加锁的操作,但是这把锁是有超时时间的,超时的时间是session的默认时长。超时是为了防止锁没有被正常释放导致死锁。
  • get: get就是通过key来查询
  • getOwner: 注意这里是用m.pfx来查询的,并且带了查询参数WithFirstCreate()。使用pfx来查询是因为其他的session也会用同样的pfx来尝试加锁,并且因为每个LeaseID都不同,所以第一次肯定会put成功。但是只有最早使用这个pfxsession才是持有锁的,所以这个getOwner的含义就是这样的。

接下来才是通过判断来检查是否持有锁

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是当前的版本号,resp.Succeededcmp为true时值为true,否则是false。这里的判断表明当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。

下面是取得锁的持有者的key。如果当前没有人持有这把锁,那么默认当前会话获得了锁。或者锁持有者的版本号和当前的版本号一致, 那么当前的会话就是锁的持有者。

// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
    m.Unlock(client.Ctx())
} else {
    m.hdr = hdr
}

上面这段代码就很好理解了,因为走到这里说明没有获取到锁,那么这里等待锁的删除。

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}

waitDeletes方法的实现也很简单,但是需要注意的是,这里的getOpts只会获取比当前会话版本号更低的key,然后去监控最新的key的删除。等这个key删除了,自己也就拿到锁了。

这种分布式锁的实现和我一开始的预想是不同的。它不存在锁的竞争,不存在重复的尝试加锁的操作。而是通过使用统一的前缀pfx来put,然后根据各自的版本号来排队获取锁。效率非常的高。

etcd 分布式锁

如图所示,共有4个session来加锁,那么根据revision来排队,获取锁的顺序为session2 -> session3 -> session1 -> session4。

当然,这里为什么可以通过revision来判定获取锁的顺序,就需要更深入的了解etcd的内部机制以及raft协议了。

分布式文件上传方案

一、背景

考虑可扩展性,后台的服务肯定是要能够支持任意的扩展的,这样才能在业务量增长时通过增加机器的方式来应对。这对后台服务提出了一个要求,必须处理好分布式环境和单机环境的不同带来的问题。比如:在文件的分片上传这一场景下,应该负载均衡的问题,一个文件的多个分片请求会分布到不同的服务器上,这导致在将多个分片合并成完整文件时出现问题,而单机情况下则完全不会有这样的问题。

二、难点

这个问题的解决方案有很多种,但是需要根据实际情况尽量选择简洁、易部署和维护的方案进行,并且不能丢掉分布式系统的优点。比如网上的有的方案是使用单独的文件上传服务器,但是这就变成了单机服务了。也有使用NFS挂载的方案,即所有的服务器挂载一个相同的NFS目录,所有上传相关的文件都存放在挂载的目录下,这不仅给运维带来了麻烦,为了保证NFS的高可用,也带来了额外的运维成本。

三、解决方案

3.1 借助负载均衡

借助负载均衡的方案很简单,这是和应用无关的一种方法。即通过负载均衡这一层,将同一个文件的不同分片的请求全部导向到同一个服务器上。比如负载均衡这一块使用的是nginx, 通过url hash的方式来完成,可以使用如下的配置:

upstream backend {
    server 0.0.0.0:8080;
    server 0.0.0.0:8081;
    server 0.0.0.0:8082;
    hash $request_uri;
}
server {
    location / {
        proxy_pass http://backend;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Host $host;
    }
}

然后写一个简单的服务来验证一下:

func post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    resp.WriteHeader(http.StatusOK)
    _, err := resp.Write([]byte("ok"))
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.Parse()

    http.HandleFunc("/Post", post)
    log.Println("listen ", addr)
    err := http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

我们启动三个服务:

./godemo -addr 0.0.0.0:8080
./godemo -addr 0.0.0.0:8081
./godemo -addr 0.0.0.0:8082

使用curl来post数据过来,请求的地址类似于: http://0.0.0.0/Post?123456?后面可以当成是文件的唯一标识码,比如文件和用户id的组合的md5信息等,这样对于同一个用户上传的同一个文件的不同分片请求,通过url哈希都会得到同样的结果,这样就会被转发到同一个后端服务器。

curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?123456 -d "key=value&identity=123456" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST
curl http://0.0.0.0/Post\?789abc -d "key=vvvvv&identity=789abc" -X POST

结果如下:

端口8080的服务收到了三条请求:
2019/09/26 13:58:52 identity: 123456, value: value
2019/09/26 13:58:56 identity: 123456, value: value
2019/09/26 13:59:03 identity: 123456, value: value

端口8081的服务收到了三条请求:
2019/09/26 13:59:40 identity: 789abc, value: vvvvv
2019/09/26 13:59:43 identity: 789abc, value: vvvvv
2019/09/26 13:59:44 identity: 789abc, value: vvvvv

如果在k8s集群中部署,使用nginx-ingress的话可以使用以下的部署方案:

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
    name: pipeline-ingress
    namespace: default
    annotations:
      nginx.ingress.kubernetes.io/proxy-body-size: "50m"
      nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"
spec:
    rules:
        - host: pipeline.dev.com
          http:
              paths:
                  - path: / 
                    backend:
                        serviceName: pipeline-service
                        servicePort: 8888

当然这种方式需要注意你的每个请求的URI都需要加上额外的参数(比如用户的userId的md5),这样才能均匀的分布到不同的服务器上。

3.2 后台程序自动proxy请求

这个方案的思路是集群中的每个服务实例都有单独的标识,文件分片在第一次上传时会返回给它一个该请求所属服务器的identity,之后所有的请求会带上这个identity,之后收到请求的服务器会检查这个identity是不是属于自己,如果不属于自己,则把这个请求转发给所属的服务器。

这个方案要求每台服务器都知道其他所有服务器的identity和地址。这里我们可以使用etcd这样的分布式数据库来存储。每台服务器在启动时都向etcd里注册自己的identity和address,之后服务器转发的时候都向etcd里面查找对应的address即可。

下面是一个示例的代码:

package main

import (
    "context"
    "encoding/json"
    "flag"
    "io/ioutil"
    "log"
    "net/http"
    "go.etcd.io/etcd/clientv3"
    "net/url"
    "time"
)

type Server struct {
    Etcd string
    Addr string
    Identify string
}

type ResponseObj struct {
    Identity string `json:"identity,omitempty"`
    Value string `json:"value"`
}

func getEtcdKV(etcd string) (clientv3.KV, error) {
    cfg := clientv3.Config{
        Endpoints:               []string{etcd},
        // set timeout per request to fail fast when the target endpoint is unavailable
        DialTimeout: time.Second,
    }

    cli, err := clientv3.New(cfg)

    if err != nil {
        return nil, err
    }

    return clientv3.NewKV(cli), nil
}

func httpProxy(anotherServer string, body map[string]string) (*http.Response, error) {
    formData := url.Values{}

    for k, v := range body {
        formData.Set(k ,v)
    }

    return http.PostForm(anotherServer, formData)
}

func (s *Server) Register() error {
    cli, err := getEtcdKV(s.Etcd)
    if err != nil {
        return err
    }

    ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
    _, err = cli.Put(ctx, s.Identify, "http://" + s.Addr)
    cancel()
    if err != nil {
        return err
    }

    return nil
}

func (s *Server) Post(resp http.ResponseWriter, req *http.Request) {
    v := req.PostFormValue("key")
    identity := req.PostFormValue("identity")
    log.Printf("identity: %v, value: %v\n", identity, v)

    var data ResponseObj

    if identity != "" && identity != s.Identify {
        // proxy
        cli, err := getEtcdKV(s.Etcd)
        if err != nil {
            log.Println("get kv client error", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        etcdResp, err := cli.Get(context.Background(), identity)
        if err != nil {
            log.Println("get value error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        if len(etcdResp.Kvs) == 0 {
            log.Println("没有值")
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        proxyAddr := string(etcdResp.Kvs[0].Value)

        log.Println("proxy to: ", proxyAddr)

        text := map[string]string {
            "key": v,
            "identity": identity,
        }

        proxyResp, err := httpProxy(proxyAddr+"/Post", text)
        if err != nil || proxyResp.Body == nil {
            log.Println("http proxy error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        bodyData, err := ioutil.ReadAll(proxyResp.Body)
        if err != nil {
            log.Println("read proxy body error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }

        err = json.Unmarshal(bodyData, &data)
        if err != nil {
            log.Println("json unmarshal error: ", err)
            resp.WriteHeader(http.StatusInternalServerError)
            resp.Write([]byte("error"))
            return
        }
    } else {
        data.Identity = s.Identify
        data.Value = v
    }

    text, err := json.Marshal(data)
    if err != nil {
        log.Println("marshal json error: ", err)
        resp.WriteHeader(http.StatusInternalServerError)
        return
    }

    resp.WriteHeader(http.StatusOK)
    _, err = resp.Write(text)
    if err != nil {
        log.Println("error:", err)
    }
}

func main() {
    var addr string
    var identity string
    var etcd string
    flag.StringVar(&addr, "addr", "0.0.0.0:8080", "http listen addr:port")
    flag.StringVar(&identity, "identity", "", "identify the server")
    flag.StringVar(&etcd, "etcd", "http://127.0.0.1:2379", "etcd server url")
    flag.Parse()

    if identity == "" {
        log.Fatal("identity不可为空")
    }

    var server = &Server{
        Addr:     addr,
        Identify: identity,
        Etcd: etcd,
    }

    err := server.Register()
    if err != nil {
        log.Fatal("register server error, check your etcd server: ", err)
    }

    http.HandleFunc("/Post", server.Post)
    log.Println("listen ", addr)
    err = http.ListenAndServe(addr, nil)
    if err != nil {
        log.Fatal(err)
    }
}

3.3 基于S3存储的方案

兼容S3的存储系统有一个对外的接口叫做: ComposeObject。这个接口可以将多个文件合并成一个文件存储到指定位置。如果我们的底层存储是基于兼容S3的系统,那么就可以利用这个接口轻松的实现。

方案的步骤可以描述成以下:

  1. 前端实现将文件分片,然后上传
  2. 上传的请求会因为前端负载均衡的原因分布到不同的服务器,每个上传请求都要带上这个文件、用户id、随机字符串组合的md5值,服务器收到请求后,只管将分片上传到以md5值为名的目录下,一旦上传成功,使用etcd或redis这样作为分片计数器+1.
  3. 分片总数等于总分片数时调用ComposeObject组合所有分片存储到指定位置即可。

ceph架构研究

这篇文章的绝大多数内容都是官网原文的翻译:Ceph Architecture

ceph是什么

ceph是一个分布式的文件存储系统,它提供了以下三种存储能力:

  • 块设备存储(block device)
  • 文件系统存储(filesystem)
  • 对象存储(object storage)

因此如果你需要多种存储方案的话,ceph是一个非常好的选择。这几种存储方案的差别是什么呢?

块设备存储是将底层的存储能力以逻辑硬盘的方式暴露给主机使用。主机只感知到单块物理硬盘的挂载,底层的复杂机制都被屏蔽

对象存储是将文件抽象成对象,然后通过网络接口(比如s3的api)来对文件进行操作(put,get,delete等)。

文件系统存储是像FTP,NFS这种,可以将ceph的存储能力当成普通的文件系统来使用,也支持cd, ls这样的文件系统命令。

ceph filesystem

ceph 架构一览

ceph arch

通过上图可以看出,ceph的底层核心是RADOS,然后通过LIBRADOSCEPHFS以及基于LIBRADOS的RADOSGW, RBD对外提供存储功能。

ceph 存储集群

ceph基于RADOS提供了无限的可扩展的存储集群。关于RADOS的知识可以看这篇论文: RADOS – A Scalable, Reliable Storage Service for Petabyte-scale Storage Clusters

一个Ceph存储集群包括两类常驻服务:

  • Ceph Monitor
  • Ceph OSD Daemon

ceph daemons

Ceph Monitor维护了一个关于集群信息映射的主副本。Ceph monitors集群保证了在一个monitor服务停止时的高可用性。存储集群客户端从Ceph Monitor获取一份集群信息映射的拷贝。

Ceph OSD服务负责检查自身的状态以及其他OSD的状态,然后汇报给monitors。

存储集群客户端和每一个Ceph OSD服务使用CRUSH算法来高效的计算数据位置的信息,而不是依赖于一个中心化的查找表。librados是一个很重要的角色,它提供了Ceph的高层特性,一系列的服务接口都是建设在librados上层的。

存储数据

Ceph存储集群从Ceph客户端接受数据,这里的客户端可以是Ceph块设备存储,对象存储,文件系统或者是基于librados的自定义实现。librados将数据作为一个对象存储。每一个对象都在文件系统中对应一个文件,这些文件存储在对象存储设备上。Ceph OSD服务在存储磁盘上处理这些读写操作。

osd handle read/write operations

Ceph OSD服务在一个扁平的命名空间中(没有目录结构)存储所有的数据,每个数据都作为一个对象。一个对象有唯一标识符,二进制数据,以及有一系列键值对的元数据。这完全取决于Ceph客户端的实现。举例来说,CephFS使用元数据来存储文件属性,比如文件拥有者,创建日期,最后修改日期等等。

ceph object

注意: 对象ID是在整个集群中唯一的,而不仅仅是本地文件系统。

扩展性和高可用

在传统架构中,客户端和一个中心化的组件(比如gateway, broker, API, facade等)通信。这个中心化组件在一个复杂系统中扮演了一个单点的入口。这在性能和扩展性上都导致了局限性————单点失败。(比如,如果中心组件宕机,整个集群都不可用)。

Ceph消灭了这个中心化的入口,让客户端直接和Ceph OSD服务通信。Ceph OSD服务在其他Ceph节点上创建对象副本来保证数据安全和高可用。Ceph同样使用一个monitor的集群来保证高可用。为了消灭中心化,Ceph使用了一个叫做CRUSH的算法。

CRUSH介绍

Ceph客户端和Ceph OSD服务都使用CRUSH算法来高效的计算对象存储位置信息,而不是必须取决于一个中心化的查找表。相比于之前的方案,CRUSH提供了一个更好的数据管理机制,通过清晰的将工作分布到集群中的所有客户端和OSD服务,可以达到很好的扩展性。CRUSH使用智能的数据复制来保证灵活性,这更适用于超大规模的存储。

集群映射表

Ceph依赖于Ceph客户端和OSD服务,它们包含了5个映射表。

  • 监控映射表: 包含集群的fsid, 位置, 名字地址以及每个monitor的端口。它同样记录了当前的代数,映射表的创建时间,最后一次修改时间。可以使用ceph mon dump来查看一个监控表

  • OSD映射表: 包含集群的fsid, 这个映射表的创建时间和最后修改时间,pools的列表,副本数量,PG数量,OSD的列表以及他们的状态。使用ceph osd dump来查看osd表。

  • PG映射表: 包含PG版本号, 时间戳,最新的OSD映射表的代数,全比率,每一个放置组的详情,比如PG ID, Up Set, Acting Set,以及PG的状态(比如active + clean),每一个pool的数据使用统计信息。

  • CRUSH映射表: 包含存储设备的列表,失败域的层级(比如device,host,rack,row,room等),存储数据时的遍历层级的规则。

  • MDS映射表: 包含当前的MDS映射表的代数, 映射表的创建时间,最后修改时间。它同样包含存储元数据的池,元数据服务器的列表,那些元数据服务器是up和in的。执行ceph fs dump来查看MDS映射表。

每一个映射表都包含它的操作状态改变的可迭代历史。Ceph Monitors维护一个集群映射表的主拷贝,包含了集群成员,状态,改变,以及Ceph存储集群的全局健康状态。

高可用的监控服务

在Ceph客户端可以读或写数据前,它们都必须和一个Ceph监控服务通信来获取最新的集群映射表的拷贝。一个Ceph存储集群可以只有一个Monitor角色。当然,这会导致单点失败。

为了增加可靠性和错误容忍,Ceph支持监控服务的集群。在一个监控服务的集群中,延迟和其他错误会导致一个或多个监控服务落后于集群的当前状态。因此,Ceph必须在各个监控服务实例之间就集群状态达成一致。Ceph总是相信大多数的监控服务。Paxos算法被用来就当前集群状态达成共识。

高可用的认证

为了鉴别用户以及防止中间人攻击,Ceph提供了cephx认证系统来认证用户和后台服务。

cephx协议不致力于传输过程中数据加密(比如SSL/TLS)或者是其他的加密。

Cephx使用共享的秘钥来认证,这意味着客户端和监控集群都有一份客户端秘钥的拷贝。认证协议需要双方都能证明给对方自己拥有这个秘钥的拷贝,但是又不能把秘钥原文说出来。这提供了两端的验证,意味着集群确信用户有这个秘钥,用户也确信集群有这个秘钥。

Ceph的一个关键的可扩展的特性是避免对Ceph object store的实现有中心化接口,这意味着Ceph客户端必须能够和OSD直接通信。为了保护数据,Ceph提供了cephx认证系统。cephx协议的机制和Kerberos类似。

用户调用Ceph客户端来和监控服务器通信。和Kerberos不同的是,每一个监控服务都可以验证用户以及分发私钥,所以在使用cephx时不再有单点失败或瓶颈了。监控服务返回一个认证数据结构,类似于Kerberos ticket,包含获取Ceph服务的会话秘钥。会话秘钥被用户永久秘钥加密过,因此只有用户可以向Ceph监控服务请求服务。之后客户端使用会话秘钥来请求它想要的服务,监控服务向客户端提供一个ticket,这个ticket可以向OSD认证客户端来处理数据。
Ceph Monitors和OSD共享秘钥,因此客户端可以使用监控服务提供的ticket和任意的OSD或元数据服务器通信。和Kerberos一样,cephx ticket会过期,因此攻击者不能使用过期的ticket或会话秘钥来获取服务。

为了使用cephx,管理员必须首先设置用户。在下图中,client.admin这个用户从命令行中调用ceph auth get-or-create-key来生成用户名和秘钥。Ceph的认证系统生成用户名和秘钥,存储一份拷贝给所有的监控服务,然后将用户秘钥传送给client.admin用户。这意味着客户端和监控服务共享同一个秘钥。

request create user

为了通过监控服务的认证,客户端把用户名发送给监控服务,监控服务生成一个会话钥匙,使用秘钥附加上用户名来加密。之后,监控服务把加密后的ticket返回给客户端。客户端使用共享的秘钥解密,获取到会话钥匙。会话钥匙为当前的会话提供认证。客户端之后请求由这个会话钥匙签名的ticket。监控服务生成ticket,使用用户秘钥加密并返回给客户端。客户端解密ticket,使用它来签名给OSD和元数据服务器的请求。

client to osd

cephx协议认证每个在客户端机器和Ceph服务器之间的会话。每一个在客户端和服务器之间发送的信息,都会接连通过初始化认证,使用ticket签名,这个签名可以由监控服务,OSD和元数据服务器用他们共享的秘钥来验证。

complete process

由这种认证提供的保护是在Ceph客户端和Ceph服务端之间的。在Ceph客户端之外这个认证是不管用的。如果用户通过远程连接到Ceph客户端上,Ceph的认证不会应用到这个远程连接上。

智能的后台服务造就了超大规模的可扩展性

在很多集群架构中,集群成员的主要目的是通过一个中心化的接口知道哪些节点可以访问。然后中心化的接口通过二次转发来提供服务,这回导致在pb级别向eb级别扩展时有很大的瓶颈。

Ceph消灭了这个瓶颈:Ceph的OSD服务和Ceph客户端是集群感知的。比如Ceph客户端、Ceph OSD服务都知道集群中的其他Ceph OSD服务。这使得Ceph OSD服务可以直接和其他Ceph OSD服务以及Ceph监控服务通信。另外,它使得Ceph客户端可以直接和Ceph OSD服务直接通信。

这种方法也最大化的利用了节点的CPU和RAM, 带来了以下几个主要的好处:

  • OSD 为客户端直接提供服务:因为任何网络设备都有最大并发连接的瓶颈,一个中心化的系统在高扩展的情况下会出现一个很低的物理瓶颈。通过让Ceph客户端直接和Ceph OSD服务通信, Ceph同时增加了性能和系统容量,并且解决了单点失败的问题。Ceph客户端可以维护一个和Ceph OSD服务的会话,而不是一个中心化的系统,只要它们需要这样做。

  • OSD成员和状态:Ceph OSD服务加入到集群中并且汇报它们的状态。在最低级别上,Ceph OSD服务状态是up或者down的,代表着它是否能够处理Ceph客户端的请求。如果Ceph OSD服务是down的,这意味着Ceph OSD服务的异常。如果Ceph OSD服务不在运行(crash了),Ceph OSD服务就不能通知Ceph监控服务它停止运行了。OSD服务周期性的给Ceph监控服务发送消息,如果Ceph监控服务在约定的周期内没有收到OSD服务的消息,它就把这个OSD服务标记为down。这种机制叫做failsafe(failsafe可以理解可以故障的自动保护装置,当OSD出现故障会自动将其排除从而避免引发更大的故障)。当然,OSD服务也会查看它相邻的OSD服务是否停止,然后汇报给Ceph监控服务。这保证了Ceph监控服务是一个轻量级的进程(脏活累活都被OSD服务做了)。

  • 数据清理:作为维护数据一致性和清洁度的一部分,Ceph OSD服务可以在PG(placement group, 放置组)中清理对象。Ceph OSD服务可以将一个PG中的对象和存储在另一个OSD服务中的副本进行元数据的对比。清理操作(通常是每天)会找出bug或文件系统的错误。Ceph OSD服务同样会执行更深层次的清理,通过对对象执行按位的对比。深层次的清理(通常是每周)会找到硬盘上坏的扇区。

  • 数据复制:像Ceph客户端一样,Ceph OSD服务使用CRUSH算法,但是Ceph OSD服务使用它来计算对象的副本应该存储在哪里(或者是重新负载均衡)。在一个典型的写操作场景下,一个客户端使用CRUSH算法来计算对象存储在哪里,将这个对象映射到pool和PG中,然后查看CRUSH映射表来鉴别PG组的主OSD服务

    客户端将对象写入到主OSD服务的PG组中。之后,主OSD携带着他自己的CRUSH映射表的拷贝存储到第二和第三个OSD服务中,这是为了多副本的备份。一旦它确认数据存储成功就返回给客户端。

osd write for replication

因为有这样的数据复制的能力,Ceph OSD服务使得Ceph客户端不需要负责这件事,同时也能保证高的数据可用性和数据安全。

动态集群管理

在高可用和可扩展性章节,我们解释了Ceph如何使用CRUSH、集群感知、智能服务扩展以及维护高可用。Ceph的关键设计就是自治、自我恢复、智能Ceph OSD服务。下面我们更深入的探究一下CRUSH如何在现代化的云存储架构中发挥作用,包括存储数据、集群的重新负载均衡以及动态的从错误中恢复。

关于pool

Ceph存储系统支持一个叫做pool的概念,这是存储对象的逻辑分区。

Ceph客户端从Ceph监控服务中获取集群映射表,然后向pool中写入对象。pool的大小或副本数量、CRUSH的规则以及PG的数量决定了Ceph如何存储数据。

how crush work

pool至少会设置以下参数:

  • 对象的拥有者/访问权限
  • PG的数量
  • 使用的CRUSH规则

映射PG到OSD

每一个pool都有一系列的PG。CRUSH动态的将PG映射到OSD中。当Ceph客户端存储对象,CRUSH将会映射每一个对象到一个PG中。

将对象映射到PG创建了一个中间层,这个中间层位于Ceph OSD服务和Ceph客户端中间。在动态的存储对象时,Ceph存储系统必须能够增长(或收缩)以及重新负载均衡。如果Ceph客户端知道哪个Ceph OSD服务拥有哪一个对象,这就导致Ceph客户端和Ceph OSD服务紧耦合了。相反的,CRUSH算法将每一个对象映射到PG,然后将每一个PG映射到一个或多个OSD服务。这个中间层允许在有新的OSD服务以及新的底层OSD设备上线时,Ceph可以动态重新负载均衡。下面的图表演示了CRUSH如何映射对象到PG,然后映射PG到OSD。

crush mapping

客户端在有集群映射表的拷贝和CRUSH算法的情况下,可以准确的计算出在读取或写入对象时使用哪一个OSD。

计算PG ID

当一个Ceph客户端绑定到一个Ceph监控服务时,它会获取集群映射表的最新拷贝。有了这个集群映射表,客户端知道所有的监控服务,OSD,元数据服务。然而,它并不知道任何关于对象的位置信息。

对象位置需要计算

客户端唯一需要的输入是对象ID和pool。这很简单:Ceph在命名的pool中存储数据。当客户端存储一个命名的对象时,它使用对象名、一个哈希码、pool中PG的数量和pool名来计算PG。Ceph客户端使用以下步骤来计算PG ID。

  • 1.客户端输入pool名和对象ID(比如: pool=”liverpool”, object-id=”john”)

  • 2.Ceph获取对象ID,然后做一下哈希操作

  • 3.Ceph计算哈希值除PG数量的余数,(比如58)得到了PG ID

  • 4.Ceph获取给定的pool名的pool ID(比如”liverpool”=4)

  • 5.Ceph将pool ID放在PG ID之前(比如4.58)

在一个忙碌的会话中,计算对象位置要比执行对象位置查询快很多。CRUSH算法允许客户端计算对象应该存在哪里,这使得客户端可以直接和主OSD服务通信,存储或获取对象。

peering and sets

在之前的章节中,我们提到Ceph OSD服务检查彼此的心跳,然后汇报给Ceph监控服务。Ceph OSD做的另外一件事叫做”窥探(peering)”,这会让所有存储PG的OSD对PG中的对象状态达成一致。事实上,Ceph OSD服务也会汇报”peering”的失败给Ceph的监控服务。Peering的问题通常会自我解决。

对状态达成一致并不意味着PG有最新的内容

Ceph存储集群被设置成一个对象至少有两个副本(比如, size=2),这是数据安全的最小要求。为了高可用,Ceph存储集群应该存储多于两个的副本(比如size=3并且最小size=2)。这样它就能在维护数据安全的时候以一个降级状态继续运行。

重新负载均衡

当你添加一个Ceph OSD服务到集群中,集群映射表会随之更新。根据之前的计算PG ID的方法,这会改变集群映射表。于是,它改变了对象的位置,因为它改变了计算的一个输入参数。下面的图标描述了重新负载均衡的过程。有一些但不是所有的PG会从已存在的OSD(OSD1和OSD2)中迁移到新的OSD(OSD3)中(这个过程虽然看似很简单粗暴,但其实对大型系统影响很小)。即使在重新负载均衡的过程中,CRUSH仍然是稳定可靠的。大多数PG保留它们原有的配置,每一个OSD获取到更多的可用容量,所以在重新负载均衡之后新的OSD没有负载峰值。

ceph rebalance

数据一致性

作为维护数据一致性和清洁度的一部分,Ceph OSD同样可以在PG中清理对象。关于OSD如何维持各个副本的对象一致性,可以参考上面的数据清理。

纠删码(erasure coding)

一个纠删码pool将每个对象分成K+M个块。数据被分为K个数据块和M个编码块。这个pool被配置成K+M的大小,因此每个块都可以存在一个OSD上。块的排序作为对象的属性存储起来。

举例来说,一个使用5个OSD(K+M=5)的纠删码pool可以容忍2块数据的丢失(M=2)。

读写编码块

当一个叫做NYAN的对象包含”ABCDEFGHI”的内容被写入到pool中,擦除编码功能将内容分成3个数据块:第一部分是ABC,第二部分是DEF,第三部分是GHI。如果内容长度不是K的整数倍,会被填充内容。这个功能同样创建两个编码块:第4个是YXY,第5个是QGC。每一个块都被存储到一个OSD中。这些块以同样的名字(NYAN)以对象的形式存储在不同的OSD上,块的创建顺序必须保留,存储对象的属性中(shard_t),作为名字的额外补充。块1包含ABC,存储在OSD5中,块4包含YXY存储在OSD3中。

erasure conding

当对象NYAN从纠删码pool中读取时,解码功能读取三个块: 块1包含ABC,块3包含GHI,块4包含YXY。然后它重建了对象的原始内容ABCDEFGHI。这个解码功能被通知块2和块5是缺失的(它们被称作’擦除’)。块5不会被读取是因为OSD4从集群中退出了。一旦3个块被读取解码功能就会被调用,而此时OSD2因为最慢所以根本没有被考虑进来。

erasure decode

中断完整的写入

在一个纠删码pool中,主OSD接受所有的写操作。它负责编码内容为K+M块,然后将它们发送给其他的OSD。它同样负责维护PG日志的版本号。

在下面的图表中,一个纠删码的PG被创建成K=2 M=1,它支持3个OSD,两个存储K,一个存储M,分别称为OSD1、OSD2、OSD3。一个对象被编码以及存储在OSD中,块D1v1(数据块序号1,版本1)在OSD1上,块D2v1在OSD2上,C1v1(编码块序号1,版本1)在OSD3上。每一个OSD上的PG日志都是唯一的(1,1 是epoch 1. version 1)

interrupted-fill-writes

OSD1是主服务,接受客户端的完整的写入,这意味着写入的数据要完整的替换对象而不是覆盖一部分。版本2(v2)的对象被创建来覆盖版本1(v1)。OSD1将数据编码到3块: D1v2在OSD1上,D2v2在OSD2上,C1v2在OSD3上。每一个块都被发送到目标OSD上。当OSD接受到消息要写入数据块时,它同样创建一个新的PG日志来反映这个改变。举例来说,只要OSD3存储了C1V2,它添加1,2到日志中。因为OSD是异步工作的,当其他块已经存储好了(比如C1v1和D1v1)一些数据块可能还在处理(比如D2v2)。

write replace

如果一切顺利,数据块都会被存储,日志中的last_complete指针会从1,1移动到1,2

full write success

最后,之前的版本都可以被移除了: OSD1上的D1v1,OSD2上的D2v1, OSD3上的C1v1

remove previous version

但是如果发生了异常,OSD1在D2v2仍然写入的时候停止了,对象的版本2只是部分写入:OSD3有一个块但是不足以恢复。它丢失了两个块: D1v2和D2v2,但是纠删码参数K=2,M=1要求至少2个块是可用的,才能恢复第三个块。OSD4成为新的主服务,找到last_complete日志是1,1,这应该是最新的日志了。

osd4 online

日志1,2被发现在OSD3上,这和在OSD4上存储的最新版本1,1不一样、因此1,2被取消,C1v2块被移除。D1v1块被解码功能重建出来存在OSD4上。

rebuild-on-osd4

缓存分层

缓存层提供给Ceph客户端更好的IO性能,因为一部分数据是存储在缓存中的。缓存层包括创建一个快速/昂贵的存储设备的pool,被配置成缓存层,便宜的存储设备被当成存储层使用。Ceph objecter负责处理将对象放在哪里,tiering代理决定什么时候将对象从缓存中写入到后面的存储层。因此缓存层和背后的存储层对Ceph客户端是完全透明的。

cache tiering

ceph协议

Ceph客户端使用本地的协议来和Ceph存储集群通信。Ceph将它的功能打包到librados库中,因此你可以创建自己的Ceph客户端。下面的图表描述了这种基础架构。

basic architecture

本地协议和librados

现在的应用程序需要一个支持异步通信的简单的对象存储接口。Ceph存储集群提供了这样的一个功能。接口提供了直接的,并行的对象访问功能。

  • Pool操作
  • 快照和写时复制克隆
  • 读写对象-创建和移除-整个对象或字节范围-追加或者截短
  • 创建/设置/获取/移除 XATTRS
  • 创建/设置/获取/移除 键值对
  • 复合操作和二次ack语义
  • 对象类

对象监视和通知

客户端可以注册一个对对象的持久关注,保持一个和主OSD的会话打开。客户端可以发送通知消息和一些负载信息给所有的监视者,然后所有监视客户端都会收到通知。这使得客户端可以使用任何对象作为同步通信通道。

watch and notify

数据分片

存储设备都会有吞吐量的限制,这对性能和可扩展性都有很大的影响。大多数存储系统都支持在多个存储设备上分别存储数据的一部分片段,这会增加吞吐量和性能。最常见的就是RAID了,Ceph的分片和RAID 0类似。

Ceph提供了三类客户端: Ceph块设备、Ceph文件系统、Ceph对象存储。Ceph客户端负责转换数据给用户(一个块设备镜像,RESTful对象,CephFS文件系统目录)。

提示: Ceph存储的对象不会被分片。Ceph对象存储,Ceph块设备,Ceph文件系统将它们的数据分片存储成多个Ceph存储系统对象。Ceph客户端通过librados写入数据必须执行分片(以及并行io)才能获取分片的优点。

最简单的Ceph分片格式包含一个对象的分片。Ceph客户端向存储对象写入分片单元,直到对象达到了它的最大容量,然后创建一个新的对象来存储多出来的数据分片。这种最简单的方式对于小的块设备镜像、s3或swift对象以及CephFS文件是足够的。当然,这种简单的方式没有最大化利用Ceph分布式存储数据的优点,串行的方式也没有提升性能。下面的图标描述了这种形式的分片:

simplest strip

如果你想要更大的镜像尺寸,更大的S3或Swift对象,或者更大的CephFS目录,你应该考虑通过将数据分片分布到一个对象集合中的多个对象上来提升读写性能。并行操作会显著的提升写入性能。因为对象会被映射到不同的PG中,然后被映射到不同的OSD中,每一个写入操作都会以最大的写入速度来并行操作。单硬盘的写入会因为柱头的移动和设备的最大带宽(100M/s)而受到限制(比如每次寻道都要6ms)。 通过将写入分布到多个对象上,Ceph可以减少每个硬盘的寻道时间,然后将多个硬盘的吞吐量结合以达到更快的写入(或读取)速度。

注意:分片是独立于对象复制的。因为CRUSH在OSD中复制对象,分片也会自动被复制。

在下图中,客户端数据被分片到一个对象集合中(对象集合1),这个集合包含4个对象,第一个分片单元是位于object 0 的 strip unit0,第4个分片单元是位于object3的strip unit3。当写如第4个分片后,客户端判断对象集合是否满了。如果对象集合没有满,客户端会从第一个对象继续写入分片。如果对象集合满了,客户端重新创建一个对象集合(object set2)。然后开始写入第一个分片(strip unit16)。

strip to multiple object

下面是三个重要的变量决定了Ceph如何对数据分片:

  • 对象大小: Ceph存储系统中的对象有一个配置的最大容量(比如2MB, 4MB)。这个对象大小应该足够大,以容纳很多个分片单元。

  • 分片宽度:分片有一个配置的单元大小(比如64kb)。Ceph客户端分割数据成同样大小的分片单元,除了最后一个。分片宽度应该是对象大小的一小部分,这样一个对象就可以包含多个分片单元了。

  • 分片数量:Ceph客户端在指定分片数量的对象上写入一连串的分片单元。这些指定数量的对象叫做一个对象集合。在Ceph客户端写完对象集合中的最后一个对象后,它又开始从对象集合中的第一个对象开始写入。

    重要:在投入到生产环境之前应该测试分片配置的性能。写入数据之后就不能修改这些配置了。

    一旦Ceph客户端将数据分片,然后将分片单元映射到对象中,Ceph的CRUSH算法将对象映射到PG中,然后PG被映射到Ceph OSD服务中。

    Ceph客户端

    Ceph客户端包含一系列的服务接口,包括:

    • 块设备:块设备(RBD)服务提供可变大小的、精简配置的块设备,块设备支持快照和复制。Ceph在整个集群中将块设备分片来提高性能。Ceph支持内核对象(KO)和QEMU的管理程序直接使用librbd——避免虚拟系统的内核对象过载。

    • 对象存储:对象存储(RGW)服务提供RESTful API,和Amazon S3以及OpenStack Swift兼容。

    • 文件系统:文件系统(CephFS)服务提供了POSIX兼容的文件系统,可以使用mount这样的命令,或者作为一个用户空间的文件系统。

Ceph可以运行OSD, MDS, 监控服务的额外实例,这些会提高可扩展性和高可用性。下面的图从高层架构上描述了这个情况:

high level architecture

Ceph对象存储

Ceph对象存储服务叫做radosgw,是一个FastCGi程序,提供了RESTful HTTP API来存储对象和元数据。它是在Ceph存储集群的顶层,拥有自己的数据格式,维护自己的用户数据库,认证和访问控制。RADOS入口使用统一的命名空间,这意味着你既可以使用OpenStack Swift兼容的API,也可以使用Amazon S3兼容的API。举例来说,你可以使用S3的API写入数据,使用Swift的API读取数据。

Ceph块设备

Ceph块设备将一个块设备镜像分片到多个Ceph存储集群的对象上,每一个对象都会映射到不同的PG,PG也会分布到不同的ceph-osd服务上。

精简配置的,可生成快照的Ceph块设备是虚拟化和云计算的吸引点。在虚拟机的场景下,人们可以在QEMU/KVM中使用rbd网络存储驱动来部署ceph块设备,主机使用librbd来给客户提供块设备服务。需要云计算栈使用libvirt来集成管理程序。你可以在QUME中使用精简配置的Ceph块设备,使用libvirt来支持OpenStack、CloudStack或其他解决方案。

因为我们目前不提供librbd支持其他的管理程序,你也同样可以使用Ceph块设备内核对象来提供一个块设备给一个客户端。其他的虚拟化技术,比如Xen可以访问Ceph块设备的内核对象。这可以使用命令行工具rbd完成。

Ceph文件系统

Ceph文件系统(CephFS)提供了POSIX兼容的文件系统,处于基于对象的Ceph存储集群之上。

ceph filesystem

Ceph文件系统服务包括Ceph元数据服务(MDS)。MDS的目的是存储所有的文件系统元数据(目录,文件拥有者,访问模式等等),元数据都是存储在内存中的。原因是MDS是的一般操作(ls, cd)会对Ceph OSD服务造成巨大的压力。因此将元数据和数据本身分离开是提供高性能服务的必要条件。

一些不错的文章

https://zhuanlan.zhihu.com/p/58888246

cfssl生成证书并部署

安装

cfssl是基于go的,因此需要安装go

go get -u github.com/cloudflare/cfssl/cmd/cfssl
go get -u github.com/cloudflare/cfssl/cmd/cfssljson

安装完成后如果不能直接使用cfssl请检查$GOPATH/bin是否加入到PATH环境变量中。

执行cfssl查看一下cfssl提供的命令

Usage:
Available commands:
        sign
        serve
        gencsr
        ocspsign
        ocspserve
        revoke
        certinfo
        version
        crl
        gencert
        ocsprefresh
        scan
        info
        print-defaults
        bundle
        genkey
        gencrl
        ocspdump
        selfsign
Top-level flags:

创建CA

CA的全称是Certificate Authority,也叫证书授权中心。CA的作用是作为一个权威的、被信任的第三方机构,提供管理和签发证书。在使用https访问一个网站时,为了证明这个网站是可信任的,那么就需要使用CA颁发的证书来证明自己。

因为在内网环境中搭建docker-registry,必然不可能用到互联网上的CA机构,因此我们需要自己扮演这个角色。首先创建文件夹

mkdir ca
cfssl print-defaults config > ca/ca-config.json
cfssl print-defaults csr > ca/ca-csr.json

然后修改ca-config.json

{
    "signing": {
        "default": {
            "expiry": "2540400h"
        },
        "profiles": {
            "www": {
                "expiry": "2540400h",
                "usages": [
                    "signing",
                    "key encipherment",
                    "server auth"
                ]
            },
            "client": {
                "expiry": "2540400h",
                "usages": [
                    "signing",
                    "key encipherment",
                    "client auth"
                ]
            }
        }
    }
}

生成ca

cfssl gencert -initca ca/ca-csr.json | cfssljson -bare ca/ca -

生成服务器证书

mkdir server
cfssl print-defaults csr > server/server.json

修改server.json

{
    "CN": "docker-registry",
    "hosts": [
        "docker-registry.k8s",
        "www.docker-registry.k8s"
    ],
    "key": {
        "algo": "ecdsa",
        "size": 256
    },
    "names": [
        {
            "C": "CN",
            "ST": "SH",
            "L": "Shanghai"
        }
    ]
}

签发证书

cfssl gencert -ca=ca/ca.pem -ca-key=ca/ca-key.pem -config=ca/ca-config.json -profile=www server/server.json | cfssljson -bare server/server

使用

上面两步得到了ca.pem, server-key.pem, server.pem。ca.pem是ca的证书,server-key.pem是服务器证书的私钥,server.pem是服务器证书

为了在k8s中使用,我们需要先创建默认的tls secret

kubectl -n docker-registry create secret tls docker-registry-tls-cert --key=server/server-key.pem --cert=server/server.pem

为电脑导入ca证书

sudo mkdir /usr/share/ca-certificates/extra
sudo cp ca.pem /usr/share/ca-certificates/extra/ca.crt

运行命令更新证书

sudo dpkg-reconfigure ca-certificates

然后使用curl查看ca根证书是否安装成功

curl -v https://docker-registry.k8s

显示一下内容表示成功

* TLSv1.3 (OUT), TLS handshake, Client hello (1):
* TLSv1.3 (IN), TLS handshake, Server hello (2):
* TLSv1.2 (IN), TLS handshake, Certificate (11):
* TLSv1.2 (IN), TLS handshake, Server key exchange (12):
* TLSv1.2 (IN), TLS handshake, Server finished (14):
* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):
* TLSv1.2 (OUT), TLS change cipher, Change cipher spec (1):
* TLSv1.2 (OUT), TLS handshake, Finished (20):
* TLSv1.2 (IN), TLS handshake, Finished (20):
* SSL connection using TLSv1.2 / ECDHE-ECDSA-AES256-GCM-SHA384
* ALPN, server accepted to use http/1.1
* Server certificate:
*  subject: C=CN; ST=SH; L=SH; CN=DR
*  start date: Jun 26 06:37:00 2019 GMT
*  expire date: Apr 17 06:37:00 2309 GMT
*  subjectAltName: host "docker-registry.k8s" matched cert's "docker-registry.k8s"
*  issuer: C=US; ST=CA; L=San Francisco; CN=example.net
*  SSL certificate verify ok.
> GET / HTTP/1.1
> Host: docker-registry.k8s
> User-Agent: curl/7.64.0
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: nginx/1.15.9
< Date: Wed, 26 Jun 2019 11:40:04 GMT
< Content-Length: 0
< Connection: keep-alive
< Cache-Control: no-cache
< 
* Connection #0 to host docker-registry.k8s left intact

这里有一点需要注意的,我的电脑上安装了anaconda的环境,因此curl是在anaconda下的curl,它会默认加载/home/username/anaconda3/certs/cacert.pem这个。因此我需要将/etc/ssl/certs/ca-certificates.crt覆盖掉这个文件才能默认加载。或者也可以使用--cacert指定根目录文件的位置。

或者可以参考这篇详细的说明: https://jite.eu/2019/2/6/ca-with-cfssl/#

chrome下的相关设置

即使在我将根证书设置好并且验证成功,但是chrome仍然会有不安全的标识,这是因为需要为chrome单独导入根证书,设置的路径为: 设置 -> 高级 -> HTTPS相关设置。

go处理多态的JSON

最近使用go在对h2o的REST API进行封装时,发现了h2o的JSON返回值中有Polymorphic类型的字段。Polymorphic可以翻译为多态。一个多态的类型可以理解为既可能是float型,也可能是string类型等等。

在我的理解中,一个JSON中每个字段的类型都必须是确定的,在动态语言比如php, js这种,处理这种不确定的类型很方便。但是在go这种静态语言中,一个不确定的类型导致在解析中总是会出现类似于这样子的错误: json: cannot unmarshal string into Go struct field Foo.Value of type int

使用interface{}处理多态的问题

在go中,interface{}是一个空接口,那么所有类型都可以看做实现了这个空接口,因此interface{}可以接收任何类型的值。那么如果一个json既可能是{"mean": 123}, 又可能是{"mean": "NaN"},就可以使用以下结构体:

type Prediction struct {
    Mean interface{} `json:"mean"`
}

但是在使用Mean这个变量的时候,就比较麻烦了。

t2 := `{"mean": "NaN"}`

var p2 Prediction
err = json.Unmarshal([]byte(t2), &p2)

if err != nil {
    log.Println(err)
} else {
    if mean, ok := p2.Mean.(string); ok {
        log.Printf("p2 mean: %s \n", mean)
    } else if n, ok := p2.Mean.(int); ok {
        mean = strconv.Itoa(n)
        log.Printf("p2 mean: %s \n", mean)
    }
}

实现Unmarshaler和Marshaler接口

type Marshaler interface {
        MarshalJSON() ([]byte, error)
}

type Unmarshaler interface {
        UnmarshalJSON([]byte) error
}

在使用json.Unmarshaljson.Marshal时,会自动调用对应变量类型的UnmarshalJSONMarshalJSON方法。这样就可以定义一个FlexString类型

type FlexString string
type Prediction struct {
    Mean FlexString `json:"mean"`
}

然后实现*FlexString的UnmarshalJSON方法和FlexString的MarshalJSON方法。

func (fs *FlexString) UnmarshalJSON(b []byte) error {
    if b[0] == '"' {
        return json.Unmarshal(b, (*string)(fs))
    }
    var n float64
    if err := json.Unmarshal(b, &n); err != nil {
        return err
    }

    s := strconv.FormatFloat(n, 'g', 15, 64)

    *fs = FlexString(s)
    return nil
}

func (fs FlexString) MarshalJSON() ([]byte, error) {
    s := string(fs)
    n, err := strconv.ParseFloat(s, 64)
    if err != nil {
        return json.Marshal(s)
    }

    if math.IsNaN(n) {
        return json.Marshal(s)
    }

    return json.Marshal(n)
}

这样在使用的时候,就可以正常的对多态的Mean进行json解码以及编码了。

func main() {
    t1 := `{"mean": 123.1212}`
    var p1 Prediction
    err := json.Unmarshal([]byte(t1), &p1)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p1 mean value:", p1.Mean)
    }

    res1, err := json.Marshal(p1)
    if err != nil {
        log.Println("res1 error:", err)
    } else {
        log.Println("res1 :", string(res1))
    }

    t2 := `{"mean": "NaN"}`
    var p2 Prediction
    err = json.Unmarshal([]byte(t2), &p2)
    if err != nil {
        log.Println(err)
    } else {
        log.Println("p2 mean value:", p2.Mean)
    }

    res2, err := json.Marshal(p2)
    if err != nil {
        log.Println("res2 error:", err)
    } else {
        log.Println("res2 :", string(res2))
    }
}

运行结果如下:

2019/07/09 16:38:42 p1 mean value: 123.1212
2019/07/09 16:38:42 res1 : {"mean":123.1212}
2019/07/09 16:38:42 p2 mean value: NaN
2019/07/09 16:38:42 res2 : {"mean":"NaN"}

123.1212在解码再编码之后,仍然是float类型,NaN仍然是string类型。这里有一个注意事项,就是n, err := strconv.ParseFloat(s, 64)这个方法,如果s是NaN的字符串并不会出错,而是将n变成一个NaN,因此需要使用math.IsNaN进行检查。