01. kubectl执行apply命令的源码解析
发布于 2022年 02月 21日 05:06
概述
我们在学习kubernetes除了要了解其相关概念完,我们还需要深入了解整个kubernetes的实现机制是如何,如果还能了解其源码,那基本上我们才算是对kubernetes很熟悉吧。我将用kubernetes是如何生成一个deployment的资源,并且附带源码解读的方式讲解kubernetes的整个实现机制。
这篇文章将讲解一个deploy的yaml文件,kubectl是如何发送到apiserver的。
使用命令
kubectl apply -f deploy.yaml
交互流程
- 根据用户执行参数,生成ApplyOption实例
- 将deploy.yml文件以及本地配置信息构造成一个Result实例
- 判断是否存在该deployment资源,如果没有则创建
- 再次进行一个资源的对比,如果存在差异则进行patch
数据结构
里面有一些关键的数据结构,我们再分析的时候可以多注意下。
- ApplyOptions
- Result
- Info
- Command
源码分析
关键路径
Cmd/kubectl/kubectl.go
main()主函数
func main() {
rand.Seed(time.Now().UnixNano())
command := cmd.NewDefaultKubectlCommand()
// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
main函数最终要构造的command实例,实际上是在kubernetes/pkg/kubectl/cmd/cmd.go中实现。
command := cmd.NewDefaultKubectlCommand()
>> NewDefaultKubectlCommandWithArgs()
>> NewKubectlCommand()
操作类型分发命令
kubernetes/pkg/kubectl/cmd/cmd.go
函数NewKubectlCommand,通过传入参数,对参数进行过滤,匹配,然后分发到对应的函数中。
// NewKubectlCommand creates the `kubectl` command and its nested children.
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
// Parent command to which all subcommands are added.
// 定义一个cobra的command结构体
cmds := &cobra.Command{
...
}
// 定义可用于当前命令及其子命令的比变量
flags := cmds.PersistentFlags()
// 将参数中的"_"替换成"-"
flags.SetNormalizeFunc(utilflag.WarnWordSepNormalizeFunc) // Warn for "_" flags
flags.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
// 性能相关参数的输入输出配置
addProfilingFlags(flags)
// 生成kubconfig配置
kubeConfigFlags := genericclioptions.NewConfigFlags()
kubeConfigFlags.AddFlags(flags)
matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())
cmds.PersistentFlags().AddGoFlagSet(flag.CommandLine)
// 生成一个工厂函数
f := cmdutil.NewFactory(matchVersionKubeConfigFlags)
// Sending in 'nil' for the getLanguageFn() results in using
// the LANG environment variable.
//
// TODO: Consider adding a flag or file preference for setting
// the language, instead of just loading from the LANG env. variable.
i18n.LoadTranslations("kubectl", nil)
// From this point and forward we get warnings on flags that contain "_" separators
cmds.SetGlobalNormalizationFunc(utilflag.WarnWordSepNormalizeFunc)
ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}
// 生成命令行
groups := templates.CommandGroups{
...
{
Message: "Advanced Commands:",
Commands: []*cobra.Command{
diff.NewCmdDiff(f, ioStreams),
// 这里就是我们要分析的apply命令
apply.NewCmdApply("kubectl", f, ioStreams),
patch.NewCmdPatch(f, ioStreams),
replace.NewCmdReplace(f, ioStreams),
wait.NewCmdWait(f, ioStreams),
convert.NewCmdConvert(f, ioStreams),
},
},
...
}
groups.Add(cmds)
...
return cmds
}
获取用户输入
kubernetes/pkg/kubectl/cmd/apply.go
函数NewCmdApply
func NewCmdApply(baseName string, f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
// 生成操作方法的结构体实例
o := NewApplyOptions(ioStreams)
// Store baseName for use in printing warnings / messages involving the base command name.
// This is useful for downstream command that wrap this one.
o.cmdBaseName = baseName
// 生成命令
cmd := &cobra.Command{
Use: "apply -f FILENAME",
DisableFlagsInUseLine: true,
Short: i18n.T("Apply a configuration to a resource by filename or stdin"),
Long: applyLong,
Example: applyExample,
Run: func(cmd *cobra.Command, args []string) {
// 检验命令行的参数
cmdutil.CheckErr(o.Complete(f, cmd))
cmdutil.CheckErr(validateArgs(cmd, args))
cmdutil.CheckErr(validatePruneAll(o.Prune, o.All, o.Selector))
// 执行命令
cmdutil.CheckErr(o.Run())
},
}
...
return cmd
}
构造Result实例
kubernetes/pkg/kubectl/cmd/apply.go
Run函数中,我们先看如何通过Builder构造一个Result实例。
func (o *ApplyOptions) Run() error {
var openapiSchema openapi.Resources
if o.OpenAPIPatch {
openapiSchema = o.OpenAPISchema
}
dryRunVerifier := &DryRunVerifier{
Finder: cmdutil.NewCRDFinder(cmdutil.CRDFromDynamic(o.DynamicClient)),
OpenAPIGetter: o.DiscoveryClient,
}
// include the uninitialized objects by default if --prune is true
// unless explicitly set --include-uninitialized=false
// 资源结构转换
r := o.Builder.
Unstructured().
Schema(o.Validator).
ContinueOnError().
NamespaceParam(o.Namespace).DefaultNamespace().
FilenameParam(o.EnforceNamespace, &o.DeleteOptions.FilenameOptions). // 获取资源文件方式
LabelSelectorParam(o.Selector). // 设置用户的标签选择
IncludeUninitialized(o.ShouldIncludeUninitialized).
Flatten().
Do()
if err := r.Err(); err != nil {
return err
}
...
// 后面这段函数,我们暂且布标,先追踪Builder看下去。
}
这里用到的是第三方模块
k8s.io/cli-runtime/pkg/genericclioptions/resource/builder.go
Do()函数
func (b *Builder) Do() *Result {
r := b.visitorResult()
r.mapper = b.Mapper()
if r.err != nil {
return r
}
if b.flatten {
r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
}
helpers := []VisitorFunc{}
// 注册获取数据前的动作
if b.defaultNamespace {
helpers = append(helpers, SetNamespace(b.namespace))
}
if b.requireNamespace {
helpers = append(helpers, RequireNamespace(b.namespace))
}
helpers = append(helpers, FilterNamespace)
if b.requireObject {
// 注册从Apiserver获取数据的方法
helpers = append(helpers, RetrieveLazy)
}
//注册从返回数据中提取信息方法
if b.continueOnError {
r.visitor = NewDecoratedVisitor(ContinueOnErrorVisitor{r.visitor}, helpers...)
} else {
r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
}
return r
}
如下所示 RetrieveLazy中有获取数据的操作
// RetrieveLazy updates the object if it has not been loaded yet.
func RetrieveLazy(info *Info, err error) error {
if err != nil {
return err
}
if info.Object == nil {
return info.Get()
}
return nil
}
info获取数据的方法,将获取的信息存入info结构体的Object中。
// Get retrieves the object from the Namespace and Name fields
func (i *Info) Get() (err error) {
obj, err := NewHelper(i.Client, i.Mapping).Get(i.Namespace, i.Name, i.Export)
if err != nil {
if errors.IsNotFound(err) && len(i.Namespace) > 0 && i.Namespace != metav1.NamespaceDefault && i.Namespace != metav1.NamespaceAll {
err2 := i.Client.Get().AbsPath("api", "v1", "namespaces", i.Namespace).Do().Error()
if err2 != nil && errors.IsNotFound(err2) {
return err2
}
}
return err
}
i.Object = obj
i.ResourceVersion, _ = metadataAccessor.ResourceVersion(obj)
return nil
}
而 NewDecoratedVisitor 方法注册了数据处理的关键函数 Visit, 这个函数可以使用户可以将来自Apiserver的数据转化为通用数据集合。在后面,我们将再看看Visit函数是如何被使用的。
// NewDecoratedVisitor will create a visitor that invokes the provided visitor functions before
// the user supplied visitor function is invoked, giving them the opportunity to mutate the Info
// object or terminate early with an error.
func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor {
if len(fn) == 0 {
return v
}
return DecoratedVisitor{v, fn}
}
// Visit implements Visitor
func (v DecoratedVisitor) Visit(fn VisitorFunc) error {
return v.visitor.Visit(func(info *Info, err error) error {
if err != nil {
return err
}
for i := range v.decorators {
if err := v.decorators[i](info, nil); err != nil {
return err
}
}
return fn(info, nil)
})
}
这里其实在设计模式上使用了访问者模式,访问者模式建议将新行为放入一个名为访问者的独立类中, 而不是试图将其整合到已有类中。 现在, 需要执行操作的原始对象将作为参数被传递给访问者中的方法, 让方法能访问对象所包含的一切必要数据。可以参考:refactoringguru.cn/design-patt…
在这里,我们要获取kubernetes中所创建的资源,我们则需要构造一个访问者,然后将访问方法和所访问的资源实例作为参数传入其中。
执行命令
kubernetes/pkg/kubectl/cmd/apply.go
Run函数中,我们继续往下看,整个主要执行逻辑,还是先判断是否存在该资源,如果没有,则新建资源。再继续进行资源比较,如果存在差异,则进行patch。
func (o *ApplyOptions) Run() error {
...
count := 0
// 访问者模式, 这里就是前面构造的Visit
err = r.Visit(func(info *resource.Info, err error) error {
...
// Get the modified configuration of the object. Embed the result
// as an annotation in the modified configuration, so that it will appear
// in the patch sent to the server.
// 获取差异配置
modified, err := kubectl.GetModifiedConfiguration(info.Object, true, unstructured.UnstructuredJSONScheme)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving modified configuration from:\n%s\nfor:", info.String()), info.Source, err)
}
// Print object only if output format other than "name" is specified
printObject := len(output) > 0 && !shortOutput
// 先获取是否存在该资源,如果没有,则新建资源
if err := info.Get(); err != nil {
if !errors.IsNotFound(err) {
return cmdutil.AddSourceToErr(fmt.Sprintf("retrieving current configuration of:\n%s\nfrom server for:", info.String()), info.Source, err)
}
// Create the resource if it doesn't exist
// First, update the annotation used by kubectl apply
if err := kubectl.CreateApplyAnnotation(info.Object, unstructured.UnstructuredJSONScheme); err != nil {
return cmdutil.AddSourceToErr("creating", info.Source, err)
}
if !o.DryRun {
// Then create the resource and skip the three-way merge
options := metav1.CreateOptions{}
if o.ServerDryRun {
options.DryRun = []string{metav1.DryRunAll}
}
// 如果不是dryrun模式下,则创建资源
obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object, &options)
if err != nil {
return cmdutil.AddSourceToErr("creating", info.Source, err)
}
info.Refresh(obj, true)
}
}
...
if !o.DryRun {
...
// 使用patch的方式,变更资源,获取patch的信息
patchBytes, patchedObject, err := patcher.Patch(info.Object, modified, info.Source, info.Namespace, info.Name, o.ErrOut)
if err != nil {
return cmdutil.AddSourceToErr(fmt.Sprintf("applying patch:\n%s\nto:\n%v\nfor:", patchBytes, info), info.Source, err)
}
// 更新info信息
info.Refresh(patchedObject, true)
// 如果没有差异信息,则直接打印资源对象
if string(patchBytes) == "{}" && !printObject {
count++
printer, err := o.ToPrinter("unchanged")
if err != nil {
return err
}
return printer.PrintObj(info.Object, o.Out)
}
}
...
return nil
}
小结
在kubectl执行apply的源码处理过程,我们对知识进行一下总结:
- kubectl使用cobra模块生成命令行
- kubectl的命令参数有很多,需要进行分发,apply对应的是NewCmdApply来返回command
- 在执行Run函数过程中,代码中用到了访问者的设计模式,通过访问者的模式对数据进行获取
- 无论是否新建,最终都会走patch的逻辑判断。
结束语
这是我这个kubernetes系列的第一篇文章,在这个过程中必然会有很多不严谨的地方,还希望大家包涵,大家吸取精华(如果有的话),去其糟粕。如果大家感兴趣可以关注我。我的微信号:lcomedy2021
参考文档
kubectl的describe源码分析:www.jianshu.com/p/e1ea277fa…
kubectl的create源码分析:blog.csdn.net/hahachenche…
kubectl 的设计模式: qiankunli.github.io/2018/12/23/…