4. Extension API Server
# 视频地址 (opens new window)
# Extension Server
主要处理 CRD(Custom Resources Definition)
# CRD - 可以定义其他 API Object 的 API Object
代码实现 (createAPIExtensionsConfig)
func createAPIExtensionsConfig(
kubeAPIServerConfig genericapiserver.Config,
externalInformers kubeexternalinformers.SharedInformerFactory,
pluginInitializers []admission.PluginInitializer,
commandOptions *options.ServerRunOptions,
masterCount int,
serviceResolver webhook.ServiceResolver,
authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
) (*apiextensionsapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
// most of the config actually remains the same. We only need to mess with a couple items related to the particulars of the apiextensions
// Step.1.1 复制 Master Server Config
genericConfig := kubeAPIServerConfig
//Step.1.2 擦除 PostStartHooks
genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
// Step.1.3 擦除 RESTOptionsGetter
genericConfig.RESTOptionsGetter = nil
// override genericConfig.AdmissionControl with apiextensions' scheme,
// because apiextensions apiserver should use its own scheme to convert resources.
// Step 2. 制作 Extension Server Admission
err := commandOptions.Admission.ApplyTo(
&genericConfig,
externalInformers,
genericConfig.LoopbackClientConfig,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
return nil, err
}
// copy the etcd options so we don't mutate originals.
etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
// this is where the true decodable levels come from.
etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)
// prefer the more compact serialization (v1beta1) for storage until http://issue.k8s.io/82292 is resolved for objects whose v1 serialization is too big but whose v1beta1 serialization can be stored
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
// Step 3. 制作 RESTOptionsGetter
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
// override MergedResourceConfig with apiextensions defaults and registry
// Step 4. 制作 APIEnablement
if err := commandOptions.APIEnablement.ApplyTo(
&genericConfig,
apiextensionsapiserver.DefaultAPIResourceConfigSource(),
apiextensionsapiserver.Scheme); err != nil {
return nil, err
}
// Step 5. 制作完毕并返回 apiextensionsConfig
apiextensionsConfig := &apiextensionsapiserver.Config{
GenericConfig: &genericapiserver.RecommendedConfig{
Config: genericConfig,
SharedInformerFactory: externalInformers,
},
ExtraConfig: apiextensionsapiserver.ExtraConfig{
CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions),
MasterCount: masterCount,
AuthResolverWrapper: authResolverWrapper,
ServiceResolver: serviceResolver,
},
}
// we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
apiextensionsConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
return apiextensionsConfig, nil
}
Extension Server Instance 生成实例代码
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
if err != nil {
return nil, err
}
// hasCRDInformerSyncedSignal is closed when the CRD informer this server uses has been fully synchronized.
// It ensures that requests to potential custom resource endpoints while the server hasn't installed all known HTTP paths get a 503 error instead of a 404
hasCRDInformerSyncedSignal := make(chan struct{})
if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {
return nil, err
}
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
apiResourceConfig := c.GenericConfig.MergedResourceConfig
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
storage := map[string]rest.Storage{}
// customresourcedefinitions
if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage[resource] = customResourceDefinitionStorage
storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
}
if len(storage) > 0 {
apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
}
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
if err != nil {
// it's really bad that this is leaking here, but until we can fix the test (which I'm pretty sure isn't even testing what it wants to test),
// we need to be able to move forward
return nil, fmt.Errorf("failed to create clientset: %v", err)
}
s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
delegateHandler := delegationTarget.UnprotectedHandler()
if delegateHandler == nil {
delegateHandler = http.NotFoundHandler()
}
versionDiscoveryHandler := &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
delegate: delegateHandler,
}
groupDiscoveryHandler := &groupDiscoveryHandler{
discovery: map[string]*discovery.APIGroupHandler{},
delegate: delegateHandler,
}
establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
crdHandler, err := NewCustomResourceDefinitionHandler(
versionDiscoveryHandler,
groupDiscoveryHandler,
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
delegateHandler,
c.ExtraConfig.CRDRESTOptionsGetter,
c.GenericConfig.AdmissionControl,
establishingController,
c.ExtraConfig.ServiceResolver,
c.ExtraConfig.AuthResolverWrapper,
c.ExtraConfig.MasterCount,
s.GenericAPIServer.Authorizer,
c.GenericConfig.RequestTimeout,
time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
apiGroupInfo.StaticOpenAPISpec,
c.GenericConfig.MaxRequestBodyBytes,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy)
discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
finalizingController := finalizer.NewCRDFinalizer(
s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
crdClient.ApiextensionsV1(),
crdHandler,
)
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
s.Informers.Start(context.StopCh)
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
// OpenAPIVersionedService and StaticOpenAPISpec are populated in generic apiserver PrepareRun().
// Together they serve the /openapi/v2 endpoint on a generic apiserver. A generic apiserver may
// choose to not enable OpenAPI by having null openAPIConfig, and thus OpenAPIVersionedService
// and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
if s.GenericAPIServer.StaticOpenAPISpec != nil {
if s.GenericAPIServer.OpenAPIVersionedService != nil {
openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
}
if s.GenericAPIServer.OpenAPIV3VersionedService != nil && utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
}
}
go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh)
discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}
return nil
})
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer
// to sync makes sure that the lister will be valid before we begin. There may still be races for CRDs added after startup,
// but we won't go healthy until we can handle the ones already present.
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
close(hasCRDInformerSyncedSignal)
return true, nil
}
return false, nil
}, context.StopCh)
})
return s, nil
}
# Custom Resource 的 API Handler
上次更新: 2023/02/15, 03:43:27