01. kubectl执行apply命令的源码解析

发布于 2022年 02月 21日 05:06

概述

我们在学习kubernetes除了要了解其相关概念完,我们还需要深入了解整个kubernetes的实现机制是如何,如果还能了解其源码,那基本上我们才算是对kubernetes很熟悉吧。我将用kubernetes是如何生成一个deployment的资源,并且附带源码解读的方式讲解kubernetes的整个实现机制。

这篇文章将讲解一个deploy的yaml文件,kubectl是如何发送到apiserver的。

使用命令

kubectl apply -f deploy.yaml

交互流程

  1. 根据用户执行参数,生成ApplyOption实例
  2. 将deploy.yml文件以及本地配置信息构造成一个Result实例
  3. 判断是否存在该deployment资源,如果没有则创建
  4. 再次进行一个资源的对比,如果存在差异则进行patch

数据结构

里面有一些关键的数据结构,我们再分析的时候可以多注意下。

  • ApplyOptions
  • Result
  • Info
  • Command

源码分析

关键路径

Cmd/kubectl/kubectl.go

main()主函数

func main() {
   rand.Seed(time.Now().UnixNano())

   command := cmd.NewDefaultKubectlCommand()

   // TODO: once we switch everything over to Cobra commands, we can go back to calling
   // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
   // normalize func and add the go flag set by hand.
   pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
   pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
   // utilflag.InitFlags()
   logs.InitLogs()
   defer logs.FlushLogs()

   if err := command.Execute(); err != nil {
      fmt.Fprintf(os.Stderr, "%v\n", err)
      os.Exit(1)
   }
}

main函数最终要构造的command实例,实际上是在kubernetes/pkg/kubectl/cmd/cmd.go中实现。

command := cmd.NewDefaultKubectlCommand()
>> NewDefaultKubectlCommandWithArgs()
>> NewKubectlCommand()

操作类型分发命令

kubernetes/pkg/kubectl/cmd/cmd.go

函数NewKubectlCommand,通过传入参数,对参数进行过滤,匹配,然后分发到对应的函数中。

// NewKubectlCommand creates the `kubectl` command and its nested children.
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
	// Parent command to which all subcommands are added.
  // 定义一个cobra的command结构体
	cmds := &cobra.Command{
		...
	}
  // 定义可用于当前命令及其子命令的比变量
	flags := cmds.PersistentFlags()
  // 将参数中的"_"替换成"-"
	flags.SetNormalizeFunc(utilflag.WarnWordSepNormalizeFunc) // Warn for "_" flags
	flags.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
  // 性能相关参数的输入输出配置
	addProfilingFlags(flags)
  // 生成kubconfig配置
	kubeConfigFlags := genericclioptions.NewConfigFlags()
	kubeConfigFlags.AddFlags(flags)
	matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
	matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())

	cmds.PersistentFlags().AddGoFlagSet(flag.CommandLine)

  // 生成一个工厂函数
	f := cmdutil.NewFactory(matchVersionKubeConfigFlags)

	// Sending in 'nil' for the getLanguageFn() results in using
	// the LANG environment variable.
	//
	// TODO: Consider adding a flag or file preference for setting
	// the language, instead of just loading from the LANG env. variable.
	i18n.LoadTranslations("kubectl", nil)

	// From this point and forward we get warnings on flags that contain "_" separators
	cmds.SetGlobalNormalizationFunc(utilflag.WarnWordSepNormalizeFunc)

	ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}

  // 生成命令行
	groups := templates.CommandGroups{
	   ...
		{
			Message: "Advanced Commands:",
			Commands: []*cobra.Command{
				diff.NewCmdDiff(f, ioStreams),
        // 这里就是我们要分析的apply命令
				apply.NewCmdApply("kubectl", f, ioStreams),
				patch.NewCmdPatch(f, ioStreams),
				replace.NewCmdReplace(f, ioStreams),
				wait.NewCmdWait(f, ioStreams),
				convert.NewCmdConvert(f, ioStreams),
			},
		},
		...
	}
	groups.Add(cmds)
	...

	return cmds
}

获取用户输入

kubernetes/pkg/kubectl/cmd/apply.go

函数NewCmdApply

func NewCmdApply(baseName string, f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
  // 生成操作方法的结构体实例
	o := NewApplyOptions(ioStreams)

	// Store baseName for use in printing warnings / messages involving the base command name.
	// This is useful for downstream command that wrap this one.
	o.cmdBaseName = baseName

  // 生成命令
	cmd := &cobra.Command{
		Use:                   "apply -f FILENAME",
		DisableFlagsInUseLine: true,
		Short:                 i18n.T("Apply a configuration to a resource by filename or stdin"),
		Long:                  applyLong,
		Example:               applyExample,
		Run: func(cmd *cobra.Command, args []string) {
      // 检验命令行的参数
			cmdutil.CheckErr(o.Complete(f, cmd))
			cmdutil.CheckErr(validateArgs(cmd, args))
			cmdutil.CheckErr(validatePruneAll(o.Prune, o.All, o.Selector))
      // 执行命令
			cmdutil.CheckErr(o.Run())
		},
	}
	...

	return cmd
}

构造Result实例

kubernetes/pkg/kubectl/cmd/apply.go

Run函数中,我们先看如何通过Builder构造一个Result实例。

func (o *ApplyOptions) Run() error {
	var openapiSchema openapi.Resources
	if o.OpenAPIPatch {
		openapiSchema = o.OpenAPISchema
	}

	dryRunVerifier := &DryRunVerifier{
		Finder:        cmdutil.NewCRDFinder(cmdutil.CRDFromDynamic(o.DynamicClient)),
		OpenAPIGetter: o.DiscoveryClient,
	}

	// include the uninitialized objects by default if --prune is true
	// unless explicitly set --include-uninitialized=false
  // 资源结构转换
	r := o.Builder.
		Unstructured().
		Schema(o.Validator).
		ContinueOnError().
		NamespaceParam(o.Namespace).DefaultNamespace().
		FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions). // 获取资源文件方式
		LabelSelectorParam(o.Selector).  // 设置用户的标签选择
		IncludeUninitialized(o.ShouldIncludeUninitialized). 
		Flatten().
		Do()
	if err := r.Err(); err != nil {
		return err
	}
	...
	// 后面这段函数,我们暂且布标,先追踪Builder看下去。
}

这里用到的是第三方模块

k8s.io/cli-runtime/pkg/genericclioptions/resource/builder.go

Do()函数

func (b *Builder) Do() *Result {
	r := b.visitorResult()
	r.mapper = b.Mapper()
	if r.err != nil {
		return r
	}
	if b.flatten {
		r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
	}
	helpers := []VisitorFunc{}
  // 注册获取数据前的动作
	if b.defaultNamespace {
		helpers = append(helpers, SetNamespace(b.namespace))
	}
	if b.requireNamespace {
		helpers = append(helpers, RequireNamespace(b.namespace))
	}
	helpers = append(helpers, FilterNamespace)
	if b.requireObject {
    // 注册从Apiserver获取数据的方法
		helpers = append(helpers, RetrieveLazy)
	}
  //注册从返回数据中提取信息方法
	if b.continueOnError {
		r.visitor = NewDecoratedVisitor(ContinueOnErrorVisitor{r.visitor}, helpers...)
	} else {
		r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
	}
	return r
}

如下所示 RetrieveLazy中有获取数据的操作

// RetrieveLazy updates the object if it has not been loaded yet.
func RetrieveLazy(info *Info, err error) error {
	if err != nil {
		return err
	}
	if info.Object == nil {
		return info.Get()
	}
	return nil
}

info获取数据的方法,将获取的信息存入info结构体的Object中。

// Get retrieves the object from the Namespace and Name fields
func (i *Info) Get() (err error) {
   obj, err := NewHelper(i.Client, i.Mapping).Get(i.Namespace, i.Name, i.Export)
   if err != nil {
      if errors.IsNotFound(err) && len(i.Namespace) > 0 && i.Namespace != metav1.NamespaceDefault && i.Namespace != metav1.NamespaceAll {
         err2 := i.Client.Get().AbsPath("api", "v1", "namespaces", i.Namespace).Do().Error()
         if err2 != nil && errors.IsNotFound(err2) {
            return err2
         }
      }
      return err
   }
   i.Object = obj
   i.ResourceVersion, _ = metadataAccessor.ResourceVersion(obj)
   return nil
}

而 NewDecoratedVisitor 方法注册了数据处理的关键函数 Visit, 这个函数可以使用户可以将来自Apiserver的数据转化为通用数据集合。在后面,我们将再看看Visit函数是如何被使用的。

// NewDecoratedVisitor will create a visitor that invokes the provided visitor functions before
// the user supplied visitor function is invoked, giving them the opportunity to mutate the Info
// object or terminate early with an error.
func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor {
   if len(fn) == 0 {
      return v
   }
   return DecoratedVisitor{v, fn}
}

// Visit implements Visitor
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
	return v.visitor.Visit(func(info *Info, err error) error {
		if err != nil {
			return err
		}
		for i := range v.decorators {
			if err := v.decorators[i](info, nil); err != nil {
				return err
			}
		}
		return fn(info, nil)
	})
}

这里其实在设计模式上使用了访问者模式,访问者模式建议将新行为放入一个名为访问者的独立类中, 而不是试图将其整合到已有类中。 现在, 需要执行操作的原始对象将作为参数被传递给访问者中的方法, 让方法能访问对象所包含的一切必要数据。可以参考:refactoringguru.cn/design-patt…

在这里,我们要获取kubernetes中所创建的资源,我们则需要构造一个访问者,然后将访问方法和所访问的资源实例作为参数传入其中。

执行命令

kubernetes/pkg/kubectl/cmd/apply.go

Run函数中,我们继续往下看,整个主要执行逻辑,还是先判断是否存在该资源,如果没有,则新建资源。再继续进行资源比较,如果存在差异,则进行patch。

func (o *ApplyOptions) Run() error {
  ...
  count := 0
  // 访问者模式, 这里就是前面构造的Visit
	err = r.Visit(func(info *resource.Info, err error) error {
	   ... 
		// Get the modified configuration of the object. Embed the result
		// as an annotation in the modified configuration, so that it will appear
		// in the patch sent to the server.
    // 获取差异配置
		modified, err := kubectl.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
		if err != nil {
			return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving modified configuration from:\n%s\nfor:", info.String()), info.Source, err)
		}

		// Print object only if output format other than "name" is specified
		printObject := len(output) > 0 && !shortOutput

    // 先获取是否存在该资源,如果没有,则新建资源
		if err := info.Get(); err != nil {
			if !errors.IsNotFound(err) {
				return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving current configuration of:\n%s\nfrom server for:", info.String()), info.Source, err)
			}

			// Create the resource if it doesn't exist
			// First, update the annotation used by kubectl apply
			if err := kubectl.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil {
				return cmdutil.AddSourceToErr("creating", info.Source, err)
			}

			if !o.DryRun {
				// Then create the resource and skip the three-way merge
				options := metav1.CreateOptions{}
				if o.ServerDryRun {
					options.DryRun = []string{metav1.DryRunAll}
				}
        // 如果不是dryrun模式下,则创建资源
				obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object, &options)
				if err != nil {
					return cmdutil.AddSourceToErr("creating", info.Source, err)
				}
				info.Refresh(obj, true)
			}
    }
			...
		if !o.DryRun {
			...
		  // 使用patch的方式,变更资源,获取patch的信息
			patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
			if err != nil {
				return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err)
			}
      
			// 更新info信息
			info.Refresh(patchedObject, true)
			// 如果没有差异信息,则直接打印资源对象
			if string(patchBytes) == "{}" && !printObject {
				count++

				printer, err := o.ToPrinter("unchanged")
				if err != nil {
					return err
				}
				return printer.PrintObj(info.Object, o.Out)
			}
		}
	...

	return nil
}

小结

在kubectl执行apply的源码处理过程,我们对知识进行一下总结:

  1. kubectl使用cobra模块生成命令行
  2. kubectl的命令参数有很多,需要进行分发,apply对应的是NewCmdApply来返回command
  3. 在执行Run函数过程中,代码中用到了访问者的设计模式,通过访问者的模式对数据进行获取
  4. 无论是否新建,最终都会走patch的逻辑判断。

结束语

这是我这个kubernetes系列的第一篇文章,在这个过程中必然会有很多不严谨的地方,还希望大家包涵,大家吸取精华(如果有的话),去其糟粕。如果大家感兴趣可以关注我。我的微信号:lcomedy2021

参考文档

kubectl的describe源码分析:www.jianshu.com/p/e1ea277fa…

kubectl的create源码分析:blog.csdn.net/hahachenche…

kubectl 的设计模式: qiankunli.github.io/2018/12/23/…

访问者模式:refactoringguru.cn/design-patt…

推荐文章