argo的输入输出源代码分析

简介

argo是一个工作流的调度引擎,支持 Steps 和 DAG 这两种工作流。

  • Steps: 是按照步骤,从前往后的工作流调度方案。工作流中的每一步都只依赖上一步的结果
  • DAG: 全称是 directed acyclic graph,译为有向无环图。与 Steps 的区别在于每一步可能依赖之前的多步输出,但是不会循环依赖(也就是无环)

不论是在什么类型的工作流上,argo都抽象出了两种输入输出:

  • parameters: 通常情况下都是字符串,该字符串可以来源于标准输出,也可以来源于文件的内容
  • artifacts: 可以理解成文件

输入输出是连接整个工作流的核心。每一步都可以看作是一次函数调用。那么在argo中,它是如何实现在多步之间输入输出的传输呢?下面会通过源代码进行分析。

在看代码之前,可以看一个 argo 的工作流中的一个pod,为了查看更方便,我删除一些不需要关注的字段:

$ kubectl -n workflow describe pods custom-workflow-111-2fw2f-2639432629

Name:           custom-workflow-111-2fw2f-2639432629
Namespace:      workflow
Labels:         pipeline.starx.com/nodeID=743
                workflows.argoproj.io/completed=true
                workflows.argoproj.io/workflow=custom-workflow-111-2fw2f
Annotations:    cni.projectcalico.org/podIP: 10.42.0.83/32
                workflows.argoproj.io/node-name: custom-workflow-111-2fw2f.yolov3-evaluate-743
                workflows.argoproj.io/outputs:
                  {"result":...
                workflows.argoproj.io/template:
                  {"name":"yolov3-evaluate-743","inputs":{"parameters":[{"name":"userParam","value":"eyJTY29yZVRocmVzaG9sZCI6MC41LCJJb3VfVGhyZXNob2xkIjowLjQ...
Controlled By:  Workflow/custom-workflow-111-2fw2f
Init Containers:
  init:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      init
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/inputs/artifacts from input-artifacts (rw)
      /argo/podmetadata from podmetadata (rw)
      /argo/staging from argo-staging (rw)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
Containers:
  wait:
    Image:         argoproj/argoexec:v2.3.0
    Command:
      argoexec
      wait
    Environment:
      ARGO_POD_NAME:  custom-workflow-111-2fw2f-2639432629 (v1:metadata.name)
    Mounts:
      /argo/podmetadata from podmetadata (rw)
      /mainctrfs/argo/staging from argo-staging (rw)
      /mainctrfs/tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /mainctrfs/tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
      /var/run/docker.sock from docker-sock (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-lfk5b (ro)
  main:
    Image:         registry.cn-shanghai.aliyuncs.com/xinhuodev/wt:0.4
    Command:
      sh
    Args:
      /argo/staging/script
    Mounts:
      /argo/staging from argo-staging (rw)
      /tmp/artifacts/artifact-input0 from input-artifacts (rw,path="artifact0")
      /tmp/artifacts/artifact-input1 from input-artifacts (rw,path="artifact1")
Volumes:
  podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations
  docker-sock:
    Type:          HostPath (bare host directory volume)
    Path:          /var/run/docker.sock
    HostPathType:  Socket
  input-artifacts:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  argo-staging:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  default-token-lfk5b:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-lfk5b
    Optional:    false

我们需要关注的信息有:

  • Pod 的 Annotations
  • Init Containers 启动的初始化容器
  • Containers 中的 wait 容器和 main 容器
  • Pod 的 Volumes 和每个容器的 Mounts

Init 容器

argo 创建的 Pod 的初始化容器执行了 argoexec init 命令,从名字上可以猜测出,这个容器负责初始化 Pod 中的环境,比如获取来上一步的输入等等,对应的代码是 cmd/argoexec/commands/init.go, 我们的分析也从这里开始。在执行 argo exec init之后,第一个调用的函数应该是loadArtifacts()。这个方法中做了三件事: initExecutor()wfExecutor.StageFiles()wfExecutor.LoadArtifacts()

initExecutor:

initExecutor 的代码如下(删除了不重要的代码):

func initExecutor() *executor.WorkflowExecutor {
    tmpl, err := executor.LoadTemplate(podAnnotationsPath)

    var cre executor.ContainerRuntimeExecutor
    switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
    case common.ContainerRuntimeExecutorK8sAPI:
        cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
    case common.ContainerRuntimeExecutorKubelet:
        cre, err = kubelet.NewKubeletExecutor()
    case common.ContainerRuntimeExecutorPNS:
        cre, err = pns.NewPNSExecutor(clientset, podName, namespace, tmpl.Outputs.HasOutputs())
    default:
        cre, err = docker.NewDockerExecutor()
    }

    wfExecutor := executor.NewExecutor(clientset, podName, namespace, podAnnotationsPath, cre, *tmpl)
    yamlBytes, _ := json.Marshal(&wfExecutor.Template)
    return &wfExecutor
}

podAnnotationsPath加载模板,这个模板其实就是 Argo 中单步的执行模板,默认情况下它的值是 /argo/podmetadata/annotations,这正好是 init 容器的挂载,而这个挂载对应的卷是:

 podmetadata:
    Type:  DownwardAPI (a volume populated by information about the pod)
    Items:
      metadata.annotations -> annotations

这里的 DownwardAPI 也解释一下,它是一种 volume 的类型,可以将 Pod 和 Container 的字段通过挂载文件的方式提供给容器内的进程方案。那么这里就是将 Pod 的 Annotations 字段通过上面的路径提供给 init 容器,init 容器根据其中的 template 获取该 Pod 的输入输出。

接下来判断根据容器运行时进行判断,这里我们只考虑 docker 作为容器运行时的情况。最后调用NewExecutor实例化了一个 wfExecutor

StageFiles()

源代码如下:

func (we *WorkflowExecutor) StageFiles() error {
    var filePath string
    var body []byte
    switch we.Template.GetType() {
    case wfv1.TemplateTypeScript:
        log.Infof("Loading script source to %s", common.ExecutorScriptSourcePath)
        filePath = common.ExecutorScriptSourcePath
        body = []byte(we.Template.Script.Source)
    case wfv1.TemplateTypeResource:
        log.Infof("Loading manifest to %s", common.ExecutorResourceManifestPath)
        filePath = common.ExecutorResourceManifestPath
        body = []byte(we.Template.Resource.Manifest)
    default:
        return nil
    }
    err := ioutil.WriteFile(filePath, body, 0644)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return nil
}

职责很简单,根据 template 的类型,写入到不同的文件中,比如 script 就写入到 /argo/staging/script。这就是我们在 main 容器中执行的脚本了。

LoadArtifacts

// LoadArtifacts loads artifacts from location to a container path
func (we *WorkflowExecutor) LoadArtifacts() error {
    for _, art := range we.Template.Inputs.Artifacts {
        artDriver, err := we.InitDriver(art)

        var artPath string
        mnt := common.FindOverlappingVolume(&we.Template, art.Path)
        if mnt == nil {
            artPath = path.Join(common.ExecutorArtifactBaseDir, art.Name)
        } else {
            // If we get here, it means the input artifact path overlaps with an user specified
            // volumeMount in the container. Because we also implement input artifacts as volume
            // mounts, we need to load the artifact into the user specified volume mount,
            // as opposed to the `input-artifacts` volume that is an implementation detail
            // unbeknownst to the user.
            log.Infof("Specified artifact path %s overlaps with volume mount at %s. Extracting to volume mount", art.Path, mnt.MountPath)
            artPath = path.Join(common.ExecutorMainFilesystemDir, art.Path)
        }

        // The artifact is downloaded to a temporary location, after which we determine if
        // the file is a tarball or not. If it is, it is first extracted then renamed to
        // the desired location. If not, it is simply renamed to the location.
        tempArtPath := artPath + ".tmp"
        err = artDriver.Load(&art, tempArtPath)
        if err != nil {
            return err
        }
        if isTarball(tempArtPath) {
            err = untar(tempArtPath, artPath)
            _ = os.Remove(tempArtPath)
        } else {
            err = os.Rename(tempArtPath, artPath)
        }

        if art.Mode != nil {
            err = os.Chmod(artPath, os.FileMode(*art.Mode))
        }
    }
    return nil
}

InitDriver是初始化 Artifacts 的驱动。Argo 支持多种类型的存储系统,在 v2.3.0 这个版本支持: s3, http, git, artifactory, hdfs, raw。

FindOverlappingVolume 是检查 artifacts 的路径和用户挂载的路径是否有重合。如果有,则返回深度最深的路径,如果没有,则返回 nil。如果返回 nil, 则使用 /argo/inputs/artifacts 作为 artifacts 的基础路径。否则使用 /mainctrfs 作为路径。

下面就是下载文件,解压文件并修改权限了。

注意在这里,init、wait和main容器都挂载了input-artifactsargo-staging,并且 init 将输入和script放在了这两个卷中,所以其他几个卷都可以共享这些文件。

wait 容器

wait容器的职责有以下几点:

  • 等待 main 容器结束
  • 杀死 sidecar
  • 保存日志
  • 保存 parameters
  • 保存 artifacts
  • 获取脚本的输出流
  • 将输出放在 Annotations 上

下面我们看这些功能点的实现:

等待 main 容器结束

// Wait is the sidecar container logic which waits for the main container to complete.
// Also monitors for updates in the pod annotations which may change (e.g. terminate)
// Upon completion, kills any sidecars after it finishes.
func (we *WorkflowExecutor) Wait() error {
    // WaitInit() 是初始化操作,只有 PSN 需要
    err := we.RuntimeExecutor.WaitInit()
    if err != nil {
        return err
    }
    log.Infof("Waiting on main container")
    // waitMainContainerStart的主要原理是周期轮询Pod中的所有容器,检查main容器的ContainerID字段
    // 不为空说明启动了
    mainContainerID, err := we.waitMainContainerStart()
    if err != nil {
        return err
    }
    log.Infof("main container started with container ID: %s", mainContainerID)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // monitorAnnotations是因为pod的annotations会更改
    annotationUpdatesCh := we.monitorAnnotations(ctx)
    // 超时会杀死
    go we.monitorDeadline(ctx, annotationUpdatesCh)

    // 这里是直接用ContainerRuntime去等待容器结束的,比如docker,直接调用docker wait
    err = we.RuntimeExecutor.Wait(mainContainerID)
    if err != nil {
        return err
    }
    log.Infof("Main container completed")
    return nil
}

杀死 sidecar

main 容器运行结束后,wait 容器会负责杀死其他容器(这个让我发现了之前用 sidecar 做 main 容器运行结束后的清理工作一直无效的原因)。

// KillSidecars kills any sidecars to the main container
func (we *WorkflowExecutor) KillSidecars() error {
    if len(we.Template.Sidecars) == 0 {
        log.Infof("No sidecars")
        return nil
    }
    log.Infof("Killing sidecars")
    pod, err := we.getPod()
    if err != nil {
        return err
    }
    sidecarIDs := make([]string, 0)
    // 遍历pod中的容器,排除main和wait,然后调用runtime来杀死容器
    for _, ctrStatus := range pod.Status.ContainerStatuses {
        if ctrStatus.Name == common.MainContainerName || ctrStatus.Name == common.WaitContainerName {
            continue
        }
        if ctrStatus.State.Terminated != nil {
            continue
        }
        containerID := containerID(ctrStatus.ContainerID)
        log.Infof("Killing sidecar %s (%s)", ctrStatus.Name, containerID)
        sidecarIDs = append(sidecarIDs, containerID)
    }
    if len(sidecarIDs) == 0 {
        return nil
    }
    return we.RuntimeExecutor.Kill(sidecarIDs)
}

保存日志

argo 是支持将 main 容器中的日志持久化并保存到指定的地方的(s3, hdfs, Artifactory)。这在 argo 的文档上好像没有提到过。这一部分的逻辑比较简单,就是通过 ContainerRuntime 获取获取容器中的输出流,然后存成文件,通过 argo 中的 storage driver 保存下来。

保存 parameters

// SaveParameters will save the content in the specified file path as output parameter value
func (we *WorkflowExecutor) SaveParameters() error {
    if len(we.Template.Outputs.Parameters) == 0 {
        log.Infof("No output parameters")
        return nil
    }
    log.Infof("Saving output parameters")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    // 遍历模板参数
    for i, param := range we.Template.Outputs.Parameters {
        log.Infof("Saving path output parameter: %s", param.Name)
        // Determine the file path of where to find the parameter
        if param.ValueFrom == nil || param.ValueFrom.Path == "" {
            continue
        }

        var output string
        if we.isBaseImagePath(param.ValueFrom.Path) {
            log.Infof("Copying %s from base image layer", param.ValueFrom.Path)
            // 容器内,通过 runtime 获取
            output, err = we.RuntimeExecutor.GetFileContents(mainCtrID, param.ValueFrom.Path)
            if err != nil {
                return err
            }
        } else {
            log.Infof("Copying %s from from volume mount", param.ValueFrom.Path)
            mountedPath := filepath.Join(common.ExecutorMainFilesystemDir, param.ValueFrom.Path)
            // 容器的挂载卷,直接获取
            out, err := ioutil.ReadFile(mountedPath)
            if err != nil {
                return err
            }
            output = string(out)
        }

        outputLen := len(output)
        // Trims off a single newline for user convenience
        if outputLen > 0 && output[outputLen-1] == '\n' {
            output = output[0 : outputLen-1]
        }
        // 保存下来
        we.Template.Outputs.Parameters[i].Value = &output
        log.Infof("Successfully saved output parameter: %s", param.Name)
    }
    return nil
}

保存 artifacts

保存 artifacts 和 保存 parameters 的操作是一样的。

// SaveArtifacts uploads artifacts to the archive location
func (we *WorkflowExecutor) SaveArtifacts() error {
    if len(we.Template.Outputs.Artifacts) == 0 {
        log.Infof("No output artifacts")
        return nil
    }
    log.Infof("Saving output artifacts")
    mainCtrID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }

    err = os.MkdirAll(tempOutArtDir, os.ModePerm)
    if err != nil {
        return errors.InternalWrapError(err)
    }

    for i, art := range we.Template.Outputs.Artifacts {
        err := we.saveArtifact(mainCtrID, &art)
        if err != nil {
            return err
        }
        we.Template.Outputs.Artifacts[i] = art
    }
    return nil
}

获取脚本的输出流

直接调用 runtime 去获取 main 容器的输出流,然后保存到 template.outputs 中

func (we *WorkflowExecutor) CaptureScriptResult() error {
    if we.Template.Script == nil {
        return nil
    }
    log.Infof("Capturing script output")
    mainContainerID, err := we.GetMainContainerID()
    if err != nil {
        return err
    }
    reader, err := we.RuntimeExecutor.GetOutputStream(mainContainerID, false)
    if err != nil {
        return err
    }
    defer func() { _ = reader.Close() }()
    bytes, err := ioutil.ReadAll(reader)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    out := string(bytes)
    // Trims off a single newline for user convenience
    outputLen := len(out)
    if outputLen > 0 && out[outputLen-1] == '\n' {
        out = out[0 : outputLen-1]
    }
    we.Template.Outputs.Result = &out
    return nil
}

将输出放在 Annotations 上

将 outputs 存在 pod 的 annotations 上。

func (we *WorkflowExecutor) AnnotateOutputs(logArt *wfv1.Artifact) error {
    outputs := we.Template.Outputs.DeepCopy()
    if logArt != nil {
        outputs.Artifacts = append(outputs.Artifacts, *logArt)
    }

    if !outputs.HasOutputs() {
        return nil
    }
    log.Infof("Annotating pod with output")
    outputBytes, err := json.Marshal(outputs)
    if err != nil {
        return errors.InternalWrapError(err)
    }
    return we.AddAnnotation(common.AnnotationKeyOutputs, string(outputBytes))
}

总结

init 容器做了 pod 的初始化,包括存储 script,下载 artifacts等等,这样我们的 main 容器就不用关心输入的来源,只需要在指定地方使用即可。wait 容器负责监控 main 容器的生命周期,在 main 容器中的主要逻辑运行结束之后,负责将输出部分读取,持久化,这样 main 容器就不用操心如何将该步产生的结果传到后面的步骤上的问题。