1. API Server 源码简介
# 视频地址 (opens new window)
# 准备工作
- 推荐书籍

# 目录梳理
./cmd/kube-apiserverAPI Server 可执行程序的主入口,基于 cobra,主要负责接受命令行参数,也是代码学习的入口
./vendor/k8s.io和./staging/src/k8s.iovendor 机制是老一代版本依赖管理机制,module 是新一代。不过 vendor 目录存在还是会被优先使用;staging 中包含正在被单独抽离的组件,软引用到 vendor 下
./pkg大部分源码所在地,除了被抽离为单独组件的部分,例如 api-server 的代码,proxy 相关的代码,kubelet 组件的代码等
./pkg/api和./pkg/apisapi 文件下包含 OpenAPI 相关的模型定义等内容,用于根据 OpenAPI 规范形成符合规定的 API
apis 是包含内建 API Groups 和 API Objects 的,和 scheme 相关的代码大部分在这里
./pluginKubernetes 内建的 plugin 实现,包含 admission 和 auth 两个部分
# API Server 启动过程
重点
API ObjectKubernetes 内部管理的基本元素,是 k8s 在 ETCD 中信息存储单元。
例如:Deployment、Pod、Service 都是 API Object,代码内部也常用 "API" 来称呼他们。
API Group一组 API Object 组成的一个具有共性的对象集合。
例如:apps 这个 group 它是由 Deployment、ReplicaSet、StatefulSet 等 API Object 组成的
Legacy API Object绝大多数的 API Object 都被归在某个 API Group 下面,特别是新版本引入的一定会遵从这一原则,但在 k8s 项目初始化阶段引入的 API Object 没有显示定义在 API Group。例如 Pod
Event、Node 等。在代码中有时也称呼他们为 "core" API Object
示例 (kube-apiserver)
func NewAPIServerCommand() *cobra.Command {
s := options.NewServerRunOptions()
cmd := &cobra.Command{
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
// stop printing usage when the command errors
SilenceUsage: true,
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-apiserver loopback clients should not log self-issued warnings.
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := s.Logs.ValidateAndApply(utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
fs := cmd.Flags()
namedFlagSets := s.Flags()
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
return cmd
}
# 代码预览
# 目录结构
tree cmd/kube-apiserver
.
├── apiserver.go
├── app
│ ├── aggregator.go
│ ├── apiextensions.go
│ ├── options
│ │ ├── globalflags.go
│ │ ├── globalflags_providerless.go
│ │ ├── globalflags_providers.go
│ │ ├── globalflags_test.go
│ │ ├── options.go
│ │ ├── options_test.go
│ │ ├── validation.go
│ │ └── validation_test.go
│ ├── server.go
│ ├── server_test.go
│ └── testing
│ ├── testdata
│ │ ├── 127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.crt
│ │ ├── 127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.key
│ │ └── README.md
│ └── testserver.go
└── OWNERS
# 主程序
// cmd/kube-apiserver/apiserver.go
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}
# 命令主体
// cmd/kube-apiserver/app/server.go
cmd := &cobra.Command{
// Use Long 描述用途
Use: "kube-apiserver",
Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
// stop printing usage when the command errors
SilenceUsage: true,
// 在 Run 之前运行一些前置的指令,类似于拦截器
PersistentPreRunE: func(*cobra.Command, []string) error {
// silence client-go warnings.
// kube-apiserver loopback clients should not log self-issued warnings.
rest.SetDefaultWarningHandler(rest.NoWarnings{})
return nil
},
// 最终执行的指令
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
fs := cmd.Flags()
// Activate logging as soon as possible, after that
// show flags with the final logging configuration.
if err := s.Logs.ValidateAndApply(utilfeature.DefaultFeatureGate); err != nil {
return err
}
cliflag.PrintFlags(fs)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
return Run(completedOptions, genericapiserver.SetupSignalHandler())
},
// 检验 args 是否满足需求
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
# 调用流程
# 基本启动流程
cmd/kube-apiserver/app/server.go (opens new window)
Server Chain
- Aggregation Server
负责转发请求到 Master 或 Custom API Server
- Master
Kube API Server,负责 Build-in 的 API Object 相关的处理
例如:Pod,Event,apps ...
- Extebsion Server
Customer Resource 的处理由它完成,包括 CR 和 CRD
- Not Found Handler
找不到相应的 API Object 的时候,返回 404
提示
在真正构建的时候会生成一个 pipeline 是从右向左,依次构建,当前层处理不了的请求流向下一层,直到 Not Found Handler
# RunE 启动
cmd/kube-apiserver/app/server.go (opens new window)
# Master 中装载 API
API Server 的 "内容" 是 API Object,通过 Restful 服务对外提供操作 API Object 的能力
那么 API Server 是如何建立起针对各个 API Object 的服务呢?
# Scheme 详解
# 构造并填充 Scheme
Scheme: 存储当前 API Server 知道的 API Group (类似于 Windows 注册表)
Scheme 是如何填充的呢?
# 概念词组
Version
每个 API Group 都会有一个 version。每一个 version 会包含多个 kind (一个 kind 会出现在多个 version 下)
这些 Version 又称为 External Version,他们面向 API Server 外部
Internal Version 是 API Server 在内部程序中处理数据时对 API Object 的实际类型
GVK
Group、Veersion、Kind 这三个元素唯一确定了一个 Kind。程序中,GVK 可以理解为一个字符串,三者拼接的结果(程序中是一个含三字段的结构体)
"Type"
代码中常见的 "Type",例如 gvk2Type 字段。这里的 Type 是一个 API Object 结构体类型(元数据),是程序中处理一个 Object 时使用的数据结构
# Scheme 作用和应用场景
Scheme 是一个结构体,内含处理内外部 Version 之间的转换,GVK 和 GoType 之间转换时用的数据和方法
GVK 与 Go Type 之间的转换
Scheme 内部又两个
map,分别对应 gvk → type 和 type → gvk;使得二者可以互相找到API Object 的默认值
API Object 有许多属性,使用者在操作一个 object 时,不太可能给出所有的属性值;另外在 Object 从一个 Version 转换到另一个 Version 时也可能不需要为不存在对应关系的字段填值
内外部 Version 之间的 Convert
所有外部的 Version 都会转换为 内部 Version,转换函数时记录在 Scheme 之内的
Scheme 代码

Scheme 代码
// vendor/k8s.io/apimachinery/pkg/runtime/scheme.go
type Scheme struct {
// gvkToType allows one to figure out the go type of an object with
// the given version and name.
gvkToType map[schema.GroupVersionKind]reflect.Type
// typeToGVK allows one to find metadata for a given go object.
// The reflect.Type we index by should *not* be a pointer.
typeToGVK map[reflect.Type][]schema.GroupVersionKind
// unversionedTypes are transformed without conversion in ConvertToVersion.
unversionedTypes map[reflect.Type]schema.GroupVersionKind
// unversionedKinds are the names of kinds that can be created in the context of any group
// or version
// TODO: resolve the status of unversioned types.
unversionedKinds map[string]reflect.Type
// Map from version and resource to the corresponding func to convert
// resource field labels in that version to internal version.
fieldLabelConversionFuncs map[schema.GroupVersionKind]FieldLabelConversionFunc
// defaulterFuncs is a map to funcs to be called with an object to provide defaulting
// the provided object must be a pointer.
defaulterFuncs map[reflect.Type]func(interface{})
// converter stores all registered conversion functions. It also has
// default converting behavior.
converter *conversion.Converter
// versionPriority is a map of groups to ordered lists of versions for those groups indicating the
// default priorities of these versions as registered in the scheme
versionPriority map[string][]string
// observedVersions keeps track of the order we've seen versions during type registration
observedVersions []schema.GroupVersion
// schemeName is the name of this scheme. If you don't specify a name, the stack of the NewScheme caller will be used.
// This is useful for error reporting to indicate the origin of the scheme.
schemeName string
}
# 填充 Scheme 代码 —— Internal Version

# 填充 Scheme 代码 —— External Version

# Converter 和 Defaulter 的代码生成
