理解kubernetes service

理解service的角度

这篇文章不是关于如何使用kubernetes中的service,而是尝试整理我自己对service的看法,然后加深对service的理解。那么,我是从哪几个角度去看待service呢?

  • service是服务的稳定性保证
  • service是集群中的load balance
  • 通过无selector的service去理解VIP(虚拟ip)
  • service的设计,和不同实现方式的性能

service是服务的稳定性保证

在k8s集群中,无状态的pod副本是可以随时删除、随时创建的,并且重新创建的pod不再保留旧的pod的任何信息,包括ip地址。在这样的情况下,前端应用如何使用后端的这些pod来提供服务就成了问题,因此k8s实现了service这样一个抽象的概念。对于有selector的service,它在被创建的时候会自动创建endpoint资源,这个endpoint中包含了所有的pod的ip和端口,并且在之后的pod的删除、创建中,这个endpoint中会立即更新相关pod的ip和端口信息。同时,service的ip地址是永远固定的,service和endpoint是一一对应的关系。这样,如果前端应用通过固定的service ip来访问pod提供的服务,那么就可以在endpoint中找到一个可用的pod的ip和端口,然后通过一些操作(这个在后面会整理)将数据包转发到指定的pod上即可。

# 你可以通过kubectl查看service和endpoint来加深理解

$ kubectl -n h2o describe svc h2o

Name:              h2o
Namespace:         h2o
Labels:            app=h2o
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"h2o"},"name":"h2o","namespace":"h2o"},"spec":{"clusterIP...
Selector:          app=h2o
Type:              ClusterIP
IP:                None
Port:              web  54321/TCP
TargetPort:        54321/TCP
Endpoints:         10.42.1.33:54321,10.42.2.139:54321
Session Affinity:  None
Events:            <none>


$ kubectl -n h2o get endpoints h2o
NAME   ENDPOINTS                            AGE
h2o    10.42.1.33:54321,10.42.2.139:54321   4h45m

service通过ip地址的固定来保证服务的稳定性。那为啥service就是可以固定不变的呢?这是因为service本身就是一个抽象的概念啊,它不是一个正在运行的进程,只是一条数据,也正因为如此,它的ip地址和端口号也是不存在的,这些都是存储在etcd中的一条数据。那么k8s是如何通过这样一个虚假的ip和端口将请求转发到真实存在的pod中呢?这就是后面要说的内容了。

service是集群中的load balance

在上一节说到,一个service会对应一个endpoint,这个endpoint中会保存所有当前匹配到的pod的ip和端口号。那么现在有一个http请求过来了,发现endpoint中有三个待选的pod,那么我们使用一定的方式比较公平的选择出一个pod,就轻松的达到了负载均衡的效果。

service load balance

那么k8s中,load balance的策略是什么样的呢?因为不同的service实现方式使用的方法不同,这个内容会在后面整理。

通过无selector的service去理解VIP(虚拟ip)

在前面的内容中,service一直和endpoint、pod关联在一起,那么如果我们的service没有selector,就不会创建endpoint了,也不会关联pod。前面也提到了service是一个抽象的概念,其拥有的ip和port都是假的。其实这个叫做VIP(virtual ip)。那么,如何通过无selector的service来理解VIP呢?

在k8s中创建无selector service的时候,不会自动创建关联的endpoint,更不会去匹配pod了。但是这样的service仍然是拥有ip和port的。我们可以尝试一下:

svc-without-selector.yaml

apiVersion: v1
kind: Service
metadata:
  name: my-service
spec:
  ports:
    - protocol: TCP
      port: 8081
      targetPort: 8081
kubectl apply -f svc-without-selector.yaml

查看一下这个svc的详情:

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         <none>
Session Affinity:  None
Events:            <none>

除了拥有ip和端口号,就什么都没有了。这就是说service为什么就是一条数据的原因,10.43.12.208也就是一个VIP。

对于无selector的service还有一个用处,就是让集群内部的应用可以稳定的访问到集群外部的服务。因为service是稳定的,那么集群内部都可以访问这个service,然后让这个service将请求转发到集群外。

这里我们可以手动创建一个endpoint,这个endpoint包含了集群外的两个http服务

apiVersion: v1
kind: Endpoints
metadata:
  name: my-service
subsets:
  - addresses:
      - ip: 192.168.50.99
      - ip: 192.168.50.201
    ports:
      - port: 8081

然后我们先检查一下service,发现endpoints已经更新了。

$ kubectl describe svc my-service

Name:              my-service
Namespace:         default
Labels:            <none>
Annotations:       kubectl.kubernetes.io/last-applied-configuration:
                     {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"my-service","namespace":"default"},"spec":{"ports":[{"port":8081,...
Selector:          <none>
Type:              ClusterIP
IP:                10.43.12.208
Port:              <unset>  8081/TCP
TargetPort:        8081/TCP
Endpoints:         192.168.50.201:8081,192.168.50.99:8081
Session Affinity:  None
Events:            <none>

我们在集群内部访问一下(使用kubectl exec到一个pod上):

$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1
$ wget my-service:8081 -q -O out | cat out
server 2
$ wget my-service:8081 -q -O out | cat out
server 1

service的设计,和不同实现方式的性能

service的设计是以提高性能为前提不断的演进的,这里是关于Service的设计讨论: DESIGN: Services v2。感兴趣的还可以看看k8s-release-v1.0的时候对service的描述: Service

service的设计中有4个角色: Pod、 Service、Ambassador、Portal

  • Pod: k8s集群中的最小调度单位,包含一个或多个容器
  • Service: 一组pod的集合,由标签选择器来关联
  • Ambassador: 中文翻译是大使,是一段可执行的逻辑,它负责实现客户端访问Service,然后将请求转发到一个对应的Pod上。这个Ambassador可以是一个云服务商的服务,也可以是一个单独的pod(比如haproxy),或者是每个节点都有的共享进程(kube-proxy)。
  • Portal: 固定的ip:port对,客户端只要访问这个Portal,请求自然会被转发到Ambassador上,客户端不需要理解Ambassador的具体实现。

最初的设计中是有三种方案,

方案一: 每个服务一个ip,共享的Ambassador。这个ip就是上面说的Portal ip。将服务以及ip、端口广播给所有的kube-proxy实例。kube-proxy设置好iptables来“窃取”所有到Portal(ip,port)的请求,然后将这个请求转发到自己的某个端口上。这里kube-proxy扮演的是Ambassador角色,它会使用round-robin的方法来把请求均衡的分发到Service后面的Pod上。这个方案里,有以下的优点和缺点:

优点:
– 不会有端口冲突
– Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
– iptables可以放在root namespace,即使pods重启了也不需要更新iptables(这是因为iptables是负责将到service ip:port的流量转发到kube-proxy的一个端口上即可)。
– 不需要在pod上预先声明需要的Service。

缺点:

  • kube-proxy是多租户的(需要为所有的service做流量转发)
  • 从kube-proxy转发的流量的源ip不是真实的源ip,
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 当service数量级上千后可扩展性不高

方案二: 每个服务一个ip,私有的Ambassador。对每个pod来说,都有一个私有的的ambassador,这要求pod需要先声明它们想先访问那个服务(否则的话,对于集群中的每次Service的添加和删除,都需要kubelet或其他的root-namespace、true-root的用户代理变动到每个pod的namespace下。[iptables规则需要root用户]),这样才能在pod的命名空间下建立iptables规则。

优点:

  • 不会有端口冲突
  • Service的ip和port都是固定的,方便做DNS A (forward) 和 PTR (reverse)和 SRV 记录。
  • 代理不是多租户的
  • 从kube-proxy转发的流量的源ip是真实的源ip,
  • 容易从方案一迁移
  • 需要pod预先声明服务(结构良好)

缺点:

  • iptables是配置在pod的namespace下,但是pod的命名空间重启了就必须重新运行一次
  • 需要为portal预留虚拟ip空间
  • 需要master跟踪和检查所有的portal ip
  • 需要pod预先声明服务(目前还没实现)

方案三:localhost的portal,私有的ambassador

不同于给service分配ip,而是使用本地的端口号作为portal。

介绍完这三种方案后,就可以引入service最终的演进了: userspace->iptables->ipvs。

这里先放一张iptables的工作流程图,方便理解:

iptables

userspace模式

这里的userspace就是方案一的实现,在k8s 1.0的发布中正式启用。userspace的工作原理图如下:

userspace service overview

这种模式,kube-proxy 会监视 Kubernetes master 对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会在本地 Node 上打开一个端口(随机选择)。 任何连接到“代理端口”的请求,都会被代理到 Service 的backend Pods 中的某个上面(如 Endpoints 所报告的一样)。 使用哪个 backend Pod,是 kube-proxy 基于 SessionAffinity 来确定的。

最后,它安装 iptables 规则,捕获到达该 Service 的 clusterIP(是虚拟 IP)和 Port 的请求,并重定向到代理端口,代理端口再代理请求到 backend Pod。默认情况下,用户空间模式下的kube-proxy通过round-robin选择后端。

这里有一个问题在于,client访问service的clusterIP时,iptables会把流量转发到kube-proxy的某个端口上,这样的话,每次转发都有一个内核态用户态的转换。

iptables模式

iptables service overview

这种模式,kube-proxy 会监视 Kubernetes 控制节点对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会安装 iptables 规则,从而捕获到达该 Service 的 clusterIP 和端口的请求,进而将请求重定向到 Service 的一组 backend 中的某个上面。 对于每个 Endpoints 对象,它也会安装 iptables 规则,这个规则会选择一个 backend 组合。

默认的策略是,kube-proxy 在 iptables 模式下随机选择一个 backend。类似于这样

iptables -t nat -A PREROUTING -p tcp -d 15.45.23.67 --dport 80 -j DNAT --to-destination 192.168.1.1-192.168.1.10

使用 iptables 处理流量具有较低的系统开销,因为流量由 Linux netfilter 处理,而无需在用户空间和内核空间之间切换。 这种方法也可能更可靠。

如果 kube-proxy 在 iptable s模式下运行,并且所选的第一个 Pod 没有响应,则连接失败。 这与用户空间模式不同:在这种情况下,kube-proxy 将检测到与第一个 Pod 的连接已失败,并会自动使用其他后端 Pod 重试。

您可以使用 Pod readiness 探测器 验证后端 Pod 可以正常工作,以便 iptables 模式下的 kube-proxy 仅看到测试正常的后端。 这样做意味着您避免将流量通过 kube-proxy 发送到已知已失败的Pod。

ipvs模式

ipvs是在Kubernetes v1.11正式可用的。ipvs也是依赖于iptables的,但是它的性能更高。

ipvs service overview

在ipvs模式下,kube-proxy监视Kubernetes服务和端点,调用netlink接口相应地创建IPVS规则,并定期将IPVS规则与Kubernetes服务和端点同步。该控制循环可确保IPVS状态与所需状态匹配。访问服务时,IPVS 将流量定向到后端Pod之一。

IPVS代理模式基于类似于iptables模式的netfilter挂钩函数,但是使用哈希表作为基础数据结构,并且在内核空间中工作。 这意味着,与iptables模式下的 kube-proxy 相比,IPVS 模式下的 kube-proxy 重定向通信的延迟要短,并且在同步代理规则时具有更好的性能。与其他代理模式相比,IPVS 模式还支持更高的网络流量吞吐量。

IPVS提供了更多选项来平衡后端Pod的流量。 这些是:

  • rr: round-robin
  • lc: least connection (smallest number of open connections)
  • dh: destination hashing
  • sh: source hashing
  • sed: shortest expected delay
  • nq: never queue

注意:
要在IPVS模式下运行kube-proxy,必须在启动kube-proxy之前使IPVS Linux在节点上可用。

当 kube-proxy 以 IPVS 代理模式启动时,它将验证 IPVS 内核模块是否可用。 如果未检测到 IPVS 内核模块,则 kube-proxy 将退回到以 iptables 代理模式运行。

ipvs在同步规则、网络带宽、cpu/内存消耗上都明显优于iptables,关于具体的性能数据可以看这篇文章: 华为云在 K8S 大规模场景下的 Service 性能优化实践。ipvs的详细介绍可以看这篇文章:ipvs 基本介绍。ipvs和iptables的对比:kube-proxy 模式对比:iptables 还是 IPVS?

rook ceph的rgw崩溃问题排查

问题

在开发可视化机器学习平台时,集成的FastRCNN实验一直跑不到结束就会出错。有时候是在下载基础模型以及代码包时出错,有时候在train结束后向predict传递artifacts出错。

过程

首先这个问题出现在局域网内,处于开发环境,因此ceph没有做高可用的部署。其次,ceph是用rook这个项目部署在k8s集群中的。

最后,在使用argo做机器学习的资源调度时,会出现大的数据资源下载和转移出现错误。具体表现为:大量数据下载会出现connectiion refused,日志如下:

2019-10-25 02:35:24 (20.1 MB/s) - Connection closed at byte 528482304. Retrying.
--2019-10-25 02:35:25--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/tho6wHm0UmZeZYbfWBv5lkOZ576838763
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

大量数据上传时也会中断,导致argo无法调用下一步:

日志如下:

NAME            custom-workflow-43-6rhlb.api-train-faster-1699
TYPE            Pod
PHASE           Error
MESSAGE         failed to save outputs: timed out waiting for the condition
START TIME      2019-10-24T06:15:56Z
END TIME        2019-10-24T06:30:34Z
DURATION        14:38 min

这里可能是网络问题,argo的问题或者是ceph的问题。但是当数据量不大的时候不会出现错误。因此检查ceph是否正常

~ » kubectl -n rook-ceph get pods                                                
NAME                                           READY   STATUS      RESTARTS   AGE
csi-cephfsplugin-964zm                         3/3     Running     27         46d
csi-cephfsplugin-dxnbg                         3/3     Running     12         46d
csi-cephfsplugin-provisioner-b66d48bc8-fglq9   4/4     Running     0          12d
csi-cephfsplugin-provisioner-b66d48bc8-x67pd   4/4     Running     0          12d
csi-rbdplugin-5fs2x                            3/3     Running     27         46d
csi-rbdplugin-bddlt                            3/3     Running     12         46d
csi-rbdplugin-provisioner-95dd85d6-7kc4c       5/5     Running     0          12d
csi-rbdplugin-provisioner-95dd85d6-mpjtj       5/5     Running     0          12d
rook-ceph-agent-fs4xq                          1/1     Running     9          46d
rook-ceph-agent-wx6r4                          1/1     Running     4          46d
rook-ceph-mds-myfs-a-774974c8c4-xt2ls          1/1     Running     0          12d
rook-ceph-mds-myfs-b-748d7d7f7d-wftt5          1/1     Running     0          12d
rook-ceph-mgr-a-5f54d44c98-57qcb               1/1     Running     0          12d
rook-ceph-mon-a-6f9fbfc99d-lmb6c               1/1     Running     0          17d
rook-ceph-operator-6f556bcbff-glvt6            1/1     Running     0          12d
rook-ceph-osd-0-7c489dc87b-wkt7x               1/1     Running     0          17d
rook-ceph-osd-1-86cc67cc45-25h4q               1/1     Running     0          12d
rook-ceph-osd-prepare-worknode1-xnmpw          0/1     Completed   0          12d
rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm      1/1     Running     65         12d
rook-ceph-tools-5f5dc75fd5-52jbj               1/1     Running     0          12d
rook-discover-g95ws                            1/1     Running     6          46d
rook-discover-vs5fs                            1/1     Running     13         46d

发现ceph rgw重启了65次,这个肯定是不正常的。查看ceph rgw的日志:

$ kubectl -n rook-ceph logs -p rook-ceph-rgw-my-store-a-66b7d8cc9d-vrhkm

# 截取了一部分日志
debug 2019-10-25 02:35:13.473 7f4880b98700  1 ====== starting new request req=0x55aa678c48e0 =====
debug 2019-10-25 02:35:14.549 7f4880b98700  0 ERROR: client_io->complete_request() returned Broken pipe
debug 2019-10-25 02:35:14.549 7f4880b98700  1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== starting new request req=0x55aa54a488e0 =====
debug 2019-10-25 02:35:19.949 7f48e345d700  1 ====== req done req=0x55aa54a488e0 op status=0 http_status=404 latency=0s ======

在我的理解中broken pipe一般出现在向已关闭的连接中写入数据时,会出现这个问题。但是通过日志可以发现,出现broken pipe的错误之后,rgw仍然是在处理请求的,但是部分请求的latency很高。因此这里的broken pipe是表示着rgw开始出现一些异常情况,但不是pod重启的直接原因。

真正导致rgw被杀死的原因是因为rgw进程收到了sigterm信号,然后进程被杀死。

debug 2019-10-25 02:35:24.525 7f494c52f700 -1 received  signal: Terminated from Kernel ( Could be generated by pthread_kill(), raise(), abort(), alarm() ) UID: 0
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm
debug 2019-10-25 02:35:24.525 7f494c52f700  1 handle_sigterm set alarm for 120
debug 2019-10-25 02:35:24.525 7f4962116780 -1 shutting down
debug 2019-10-25 02:35:24.629 7f488cbb0700  0 iterate_obj() failed with -9

使用kubectl describe查看pod的event:

Events:
  Type     Reason     Age                  From                Message
  ----     ------     ----                 ----                -------
  Normal   Killing    15m (x65 over 12d)   kubelet, worknode1  Container rgw failed liveness probe, will be restarted
  Warning  Unhealthy  15m (x249 over 12d)  kubelet, worknode1  Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers)
  Normal   Pulled     15m (x66 over 12d)   kubelet, worknode1  Container image "ceph/ceph:v14.2.2-20190826" already present on machine
  Normal   Created    15m (x66 over 12d)   kubelet, worknode1  Created container rgw
  Normal   Started    15m (x66 over 12d)   kubelet, worknode1  Started container rgw

这里才是真正的重启原因,kubelet检查pod是否存活,但是请求超时了。意味pods出现的故障,因此杀死了pod并重启。

pod的liveness设置:

Liveness:       http-get http://:80/ delay=10s timeout=1s period=10s #success=1 #failure=3

k8s的liveness机制是检查pod中应用程序存活状态并在出错后自动重启的一种机制。提供了三种方式:

  • 在容器内执行命令,如果执行成功,则表示容器是存活并且健康的。否则就重启容器使得应用程序恢复正常。
  • 使用http请求检查,如果返回的状态码是200则表示正常,否则表示失败。
  • 使用tcp连接检查,如果kubelet可以打开指定端口的socket连接,则表示正常,否则表示失败。

在这个场景下出现Warning Unhealthy 15m (x249 over 12d) kubelet, worknode1 Liveness probe failed: Get http://10.42.2.44:80/: net/http: request canceled (Client.Timeout exceeded while awaiting headers),表示kubelet使用http get检查pod的80端口,但是这个请求却超时了。因此杀死了容器并重启,导致大文件(700MB以上)上传/下载失败。

这里kubelet检查的是http://10.42.2.44:80这个地址,我们回过头看一下argo那边的报错信息,Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.。都是80端口,当然这里千万不能被ip地址误导了,10.42.2.44是pod的ip地址,10.43.126.166是service的ip地址,我们可以验证一下:

~ » kubectl -n rook-ceph get svc                                                 
NAME                              TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
rook-ceph-rgw-my-store            ClusterIP   10.43.126.166   <none>        80/TCP              46d

然后再结合之前debug 2019-10-25 02:35:14.549 7f4880b98700 1 ====== req done req=0x55aa678c48e0 op status=0 http_status=200 latency=1.076s ======这条日志,latency已经超过了1s,而kubelet的liveness超时时间是1s。

现在基本可以得出以下异常流程:

  1. 因为某些原因,导致rgw出现broken pipe的出错,并且部分请求的lantency时间大大提高。
  2. kubelet周期性的对rgw做liveness的检查,并且检查的http就是rgw的80端口,这个端口因为上面的原因导致lantency超过了1s,而liveness检查的timeout只有1s。因此kubelet认为该pod不健康,选择重启。
  3. kubelet向rgw发送了sigterm信号,rgw关闭进程,pod重启。

猜测可能造成这个问题的原因:

  1. ceph所在机器的性能不够,导致响应请求出现问题。
  2. 局域网的网络问题,因为内部的最高带宽只有10MB/s,但是局域网内的设备很多,网络这部分导致了瓶颈。

关于机器性能的问题,我认为是可以排除的,因为机器性能本身很好,并且开发环境几乎没有请求量,接下来就是验证是因为网络问题导致瓶颈,造成部分接口延迟过高被杀死。

为了验证这个猜想,假设这里有三台机器A,B,C,组成了一个k8s集群,ceph是部署在k8s之上的。在A之上,我用dd命令产生一个40G的大文件,然后在B之上使用wget下载。然后在C上面观察ping的延迟是否上升。

A:

$ dd if=/dev/zero of=test bs=1M count=0 seek=40000

$ python -m SimpleHTTPServer 8999

B:

$ wget http://192.168.50.37:8999/test

--2019-10-25 14:18:30--  http://192.168.50.37:8999/test
正在连接 192.168.50.37:8999... 已连接。
已发出 HTTP 请求,正在等待回应... 200 OK
长度: 41943040000 (39G) [application/octet-stream]
正在保存至: “test”

test      3%[==>            ]   1.54G  11.1MB/s    剩余 73m 5s

C:

$ ping 192.168.50.37
PING 192.168.50.37 (192.168.50.37) 56(84) bytes of data.
64 bytes from 192.168.50.37: icmp_seq=1 ttl=64 time=0.384 ms
64 bytes from 192.168.50.37: icmp_seq=2 ttl=64 time=0.373 ms
64 bytes from 192.168.50.37: icmp_seq=3 ttl=64 time=0.336 ms
64 bytes from 192.168.50.37: icmp_seq=4 ttl=64 time=4.90 ms
64 bytes from 192.168.50.37: icmp_seq=5 ttl=64 time=1.18 ms
64 bytes from 192.168.50.37: icmp_seq=6 ttl=64 time=7.74 ms
64 bytes from 192.168.50.37: icmp_seq=7 ttl=64 time=3.51 ms
64 bytes from 192.168.50.37: icmp_seq=8 ttl=64 time=6.66 ms
64 bytes from 192.168.50.37: icmp_seq=9 ttl=64 time=6.31 ms

网络延迟增加的还是很明显的。

这时候使用argo开始一个新的机器学习的实验,但是这个实验的数据量较小,在之前的使用中都没有问题。结果确实出现了问题:

2019-10-25 06:21:04 (8.10 MB/s) - Connection closed at byte 136314880. Retrying.
--2019-10-25 06:21:05--  (try: 2)  http://rook-ceph-rgw-my-store.rook-ceph/workflow-storage/maC8Om6Y3QbSJxSTT9x82rp8908272441
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.
Resolving rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)... 10.43.126.166
Connecting to rook-ceph-rgw-my-store.rook-ceph (rook-ceph-rgw-my-store.rook-ceph)|10.43.126.166|:80... failed: Connection refused.

那么为了有对照实验,将下载关闭,重新做这个机器学习的实验,结果正常。

解决方法

因为缺少了对ceph这块源代码的研究,上面的结论并不一定正确。但是可以大概得出如何解决,可以先尝试将liveness检测的timeout时间增加。

kubectl -n rook-ceph edit deployment rook-ceph-rgw-my-store-a

把liveness的timeout时间调成5s,这样就解决了这个问题。

理解go context

理解context

在我刚接触context包时,我是有一点迷惑的。因为在其他的编程语言中很少有接触到context包类似的用法。比如在js绘制canvas中的context,也只是作为保留上下文操作来用的。在go语言的context包中,同样也可以当成上下文来理解,但是在看待context提供的能力时,要从以下两点来理解:

  • context提供了一种管理多个goroutine的机制。
  • context最终形成了一种树形结构。

在深入之前,让我们回忆一下多线程/进程模型中,主线程/进程是如何管理子线程/进程的。如果子线程/进程又派生了其他的线程/进程呢?这一定是一个头疼的问题。

在go语言中,协程也面临了同样的问题。因此官方在go1.7版本中引入了context包。那么context提供了什么样的能力来管理协程呢?先看一个withCancel的简单的例子

func watch(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            log.Println("退出")
            return

        default:
            log.Println("执行逻辑")
            time.Sleep(2 * time.Second)
        }
    }
}

func withCancel() {
    ctx, cancel := context.WithCancel(context.Background())

    go watch(ctx)
    go watch(ctx)
    go watch(ctx)

    time.Sleep(6 * time.Second)
    fmt.Println("可以了,通知子协程停止")
    cancel()
    //为了检测子协程是否停止,如果没有输出,就表示停止了
    time.Sleep(5 * time.Second)
}

调用withCancel的输出如下:

2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:48 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:50 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
2019/09/30 00:18:52 执行逻辑
可以了,通知子协程停止
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出
2019/09/30 00:18:54 退出

可以看到,我们通过context.WithCancel方法生成了一个ctx和一个cancel,然后主动调用cancel就可以通过所有的子协程退出了。在watch方法的实现中,我们是通过select机制来实现的,一旦context的Done()方法有值,就会调用return退出,否则的话就执行default中我们的业务逻辑。

这样我们就可以随时通知所有的子协程退出了。在上面说到,context最终形成了一种树形结构,是因为在子协程中也可以继续使用新的协程,这样就形成了一个树形的调用了。

协程的树形结构

我们在1中使用cancel方法,就可以向下传播,在2~10号协程中全部退出。

简单的了解context包的使用后,可以看一下context.Context这个接口,为了简洁,我删除了源代码中的注释。

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}

Context接口总共提供了4个方法。

  • Deadline()用来获取当前context的取消时间,第二个返回值ok等于false的时候,表示没有设置。
  • Done()方法返回了一个chan,当chan中读取到值的时候,表示父context已经发起了取消的请求,那么当前协程开始做相关的清理工作然后退出。
  • Err()返回context的取消原因
  • Value()方法用来通过一个key获取当前Context上与之对应的值。

理解Context的树形结构

go中大量的库都使用了context机制,比如database/sql库,net/http库等等,因为这些库都支持了context,使得我们在程序中很容易通过context来管理所有新建的协程,而不用自己实现复杂的机制来管理。一旦我们需要取消,只需要在root context调用cancel方法即可。

一些基本使用

在上面的例子中,我们使用了withCancel来实例化一个可以手动取消的context。context包中同样提供了一些其他的方法。

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithDeadline可以设置截止时间。会到达指定时间时自动取消。当然也可以调用CancelFunc来手动取消。

WithTimeout可以设置在一段时间后自动取消,和WithDeadline类似。

参考

Go语言实战笔记(二十)
Golang Context深入理解
Go Concurrency Patterns: context

etcd分布式锁的实现方式

一、概述

在etcd的clientv3包中,实现了分布式锁。使用起来和mutex是类似的,为了了解其中的工作机制,这里简要的做一下总结。

二、使用方式

etcd分布式锁的实现在go.etcd.io/etcd/clientv3/concurrency包中,主要提供了以下几个方法:

  • func NewMutex(s *Session, pfx string) *Mutex, 用来新建一个mutex
  • func (m *Mutex) Lock(ctx context.Context) error,它会阻塞直到拿到了锁,并且支持通过context来取消获取锁。
  • func (m *Mutex) Unlock(ctx context.Context) error,解锁

因此在使用etcd提供的分布式锁式非常简单,通常就是实例化一个mutex,然后尝试抢占锁,之后进行业务处理,最后解锁即可。

一个简单的例子如下:

package main

import (
    "context"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "sync"
    "time"
)

var n = 0

// 使用worker模拟锁的抢占
func worker(key string) error {
    endpoints := []string{"127.0.0.1:2379"}

    cfg := clientv3.Config{
        Endpoints:            endpoints,
        DialTimeout:          3 * time.Second,
    }

    cli, err := clientv3.New(cfg)
    if err != nil {
        log.Println("new cli error:", err)
        return err
    }

    sess, err := concurrency.NewSession(cli)
    if err != nil {
        return err
    }

    m := concurrency.NewMutex(sess, "/"+key)

    err = m.Lock(context.TODO())
    if err != nil {
        log.Println("lock error:", err)
        return err
    }

    defer func() {
        err = m.Unlock(context.TODO())
        if err != nil {
            log.Println("unlock error:", err)
        }
    }()

    log.Println("get lock: ", n)
    n++
    time.Sleep(time.Second) // 模拟执行代码


    return nil
}

func main() {
    var wg sync.WaitGroup
    wg.Add(3)
    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()


    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    go func() {
        defer wg.Done()
        err := worker("lockname")
        if err != nil {
            log.Println(err)
        }
    }()

    wg.Wait()
}

三、实现机制

Lock()函数的实现很简单。这里可以贴出来看一下:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先通过一个事务来尝试加锁,这个事务主要包含了4个操作: cmpputgetgetOwner。需要注意的是,key是由pfxLease()组成的。

  • cmp: 比较加锁的key的修订版本是否是0。如果是0就代表这个锁不存在。
  • put: 向加锁的key中存储一个空值,这个操作就是一个加锁的操作,但是这把锁是有超时时间的,超时的时间是session的默认时长。超时是为了防止锁没有被正常释放导致死锁。
  • get: get就是通过key来查询
  • getOwner: 注意这里是用m.pfx来查询的,并且带了查询参数WithFirstCreate()。使用pfx来查询是因为其他的session也会用同样的pfx来尝试加锁,并且因为每个LeaseID都不同,所以第一次肯定会put成功。但是只有最早使用这个pfxsession才是持有锁的,所以这个getOwner的含义就是这样的。

接下来才是通过判断来检查是否持有锁

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是当前的版本号,resp.Succeededcmp为true时值为true,否则是false。这里的判断表明当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。

下面是取得锁的持有者的key。如果当前没有人持有这把锁,那么默认当前会话获得了锁。或者锁持有者的版本号和当前的版本号一致, 那么当前的会话就是锁的持有者。

// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
    m.Unlock(client.Ctx())
} else {
    m.hdr = hdr
}

上面这段代码就很好理解了,因为走到这里说明没有获取到锁,那么这里等待锁的删除。

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}

waitDeletes方法的实现也很简单,但是需要注意的是,这里的getOpts只会获取比当前会话版本号更低的key,然后去监控最新的key的删除。等这个key删除了,自己也就拿到锁了。

这种分布式锁的实现和我一开始的预想是不同的。它不存在锁的竞争,不存在重复的尝试加锁的操作。而是通过使用统一的前缀pfx来put,然后根据各自的版本号来排队获取锁。效率非常的高。

etcd 分布式锁

如图所示,共有4个session来加锁,那么根据revision来排队,获取锁的顺序为session2 -> session3 -> session1 -> session4。

当然,这里为什么可以通过revision来判定获取锁的顺序,就需要更深入的了解etcd的内部机制以及raft协议了。