首页
Kubernetes 源码开发之旅
GitHub (opens new window)

brook-w

K8s 源码学习、二次开发、自定义组件开发
首页
Kubernetes 源码开发之旅
GitHub (opens new window)
  • (一)环境搭建
  • (二)编译运行并调试源码
  • (三)API Server 源码刨析

    • 1. API Server 源码简介
    • 2. Generic Server
      • 视频地址
      • Generic Server 定位
      • Generic Server 的装配
        • 基础设施
        • Request Handler 的构建
        • Server 链条的形成
        • API Resource 的装配
      • HTTP Server 的启动
      • Open API 与 Generic Server
        • Open API
    • 3. Master API Server
    • 4. Extension API Server
    • 5. Aggregator Server
    • 6. Admission
    • 7. Http Req 处理过程和 Default Filters
    • 8. 登录与鉴权的实现
  • (四)Aggregated API Server 的实现
  • Kubernetes 源码开发之旅
  • (三)API Server 源码刨析
brook-w
2022-12-05
目录

2. Generic Server

# 视频地址 (opens new window)

# Generic Server 定位

Generic Server 是什么?它能干什么?

  • 提供暴露 http 服务所需的基础设施

各个内部 Server 做的事情是一致的:对外提供 restful 服务来操作 API Object。所以大框架上大家都是一致的:需要去实现 Http Restful 服务,大家都需要 http server,那么这里可以集中提供

各个内部 Server 相互连接,形成处理链,这同样需要有实体来负责

  • 统一各种处理机制

对于同一个事项,不同的内部 Server 应采取同样的处理方式

例如 API Resource 的对外暴露形式:登录鉴权机制

  • 避免重复

大量的功能是可以复用复用的

每个内部 Server 都是建立在 Generic Server 之上,把自己的 “内容” 填入 Generic Server

每一个 Generic Server 最重要的输入是一个叫 Director 的东西,他本质上是 一个 mux 和一个 go container 的组合,所有的 http request 最终都是被这些 Director 处理的

# Generic Server 的装配

# 基础设施

  • Generic Server handler 是如何被定义出来的?
  • 一个 http 服务或者一组服务如何处理公共逻辑,比如:鉴权等?

image

# Request Handler 的构建

image

image

# Server 链条的形成

image

# API Resource 的装配

  • registerResourceHandlers

输入:由如下代码段在确定当前 API Object 支持何种 Http 操作

// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
gvAcceptor, _ := storage.(rest.GroupVersionAcceptor)

根据之前收集的数据,确定支持的 action,做成数组待用。同一个 Resource 的不同操作方法(如 List、Post)都会有一个 action 与之对应

这里是当前资源为有 namespace 的情况

switch {
	case !namespaceScoped:
		// Handle non-namespace scoped resources like nodes.
		resourcePath := resource
		resourceParams := params
		itemPath := resourcePath + "/{name}"
		nameParams := append(params, nameParam)
		proxyParams := append(nameParams, pathParam)
		suffix := ""
		if isSubresource {
			suffix = "/" + subresource
			itemPath = itemPath + suffix
			resourcePath = itemPath
			resourceParams = nameParams
		}
		apiResource.Name = path
		apiResource.Namespaced = false
		apiResource.Kind = resourceKind
		namer := handlers.ContextBasedNaming{
			Namer:         a.group.Namer,
			ClusterScoped: true,
		}

		// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		// Add actions at the resource path: /api/apiVersion/resource
		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
		actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

		// Add actions at the item path: /api/apiVersion/resource/{name}
		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)
	default:
		namespaceParamName := "namespaces"
		// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
		namespacedPath := namespaceParamName + "/{namespace}/" + resource
		namespaceParams := []*restful.Parameter{namespaceParam}

		resourcePath := namespacedPath
		resourceParams := namespaceParams
		itemPath := namespacedPath + "/{name}"
		nameParams := append(namespaceParams, nameParam)
		proxyParams := append(nameParams, pathParam)
		itemPathSuffix := ""
		if isSubresource {
			itemPathSuffix = "/" + subresource
			itemPath = itemPath + itemPathSuffix
			resourcePath = itemPath
			resourceParams = nameParams
		}
		apiResource.Name = path
		apiResource.Namespaced = true
		apiResource.Kind = resourceKind
		namer := handlers.ContextBasedNaming{
			Namer:         a.group.Namer,
			ClusterScoped: false,
		}

		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
		actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
		// DEPRECATED in 1.11
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

		// list or post across namespace.
		// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
		// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
		if !isSubresource {
			actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister)
			// DEPRECATED in 1.11
			actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList)
		}
	}

计算向 etcd 写入时(encode)使用的 version 和读出(decode)时可转换为的 version

var resourceInfo *storageversion.ResourceInfo
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
    utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) &&
    isStorageVersionProvider &&
    storageVersionProvider.StorageVersion() != nil {

    versioner := storageVersionProvider.StorageVersion()
    encodingGVK, err := getStorageVersionKind(versioner, storage, a.group.Typer)
    if err != nil {
        return nil, nil, err
    }
    decodableVersions := []schema.GroupVersion{}
    if a.group.ConvertabilityChecker != nil {
        decodableVersions = a.group.ConvertabilityChecker.VersionsForGroupKind(fqKindToRegister.GroupKind())
    }
    resourceInfo = &storageversion.ResourceInfo{
        GroupResource: schema.GroupResource{
            Group:    a.group.GroupVersion.Group,
            Resource: apiResource.Name,
        },
        EncodingVersion: encodingGVK.GroupVersion().String(),
        // We record EquivalentResourceMapper first instead of calculate
        // DecodableVersions immediately because API installation must
        // be completed first for us to know equivalent APIs
        EquivalentResourceMapper: a.group.EquivalentResourceRegistry,

        DirectlyDecodableVersions: decodableVersions,
    }
}

针对上面准备的每一个 action 都会被向目标 webservice 中加入一个 route,例如针对 POST:

case "POST": // Create a resource.
    var handler restful.RouteFunction
    if isNamedCreater {
        handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
    } else {
        handler = restfulCreateResource(creater, reqScope, admit)
    }
    handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
    handler = utilwarning.AddWarningsHandler(handler, warnings)
    article := GetArticleForNoun(kind, " ")
    doc := "create" + article + kind
    if isSubresource {
        doc = "create " + subresource + " of" + article + kind
    }
    route := ws.POST(action.Path).To(handler).
        Doc(doc).
        Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
        Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
        Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
        Returns(http.StatusOK, "OK", producedObject).
        // TODO: in some cases, the API may return a v1.Status instead of the versioned object
        // but currently go-restful can't handle multiple different objects being returned.
        Returns(http.StatusCreated, "Created", producedObject).
        Returns(http.StatusAccepted, "Accepted", producedObject).
        Reads(defaultVersionedObject).
        Writes(producedObject)
    if err := AddObjectParams(ws, route, versionedCreateOptions, disabledParams...); err != nil {
        return nil, nil, err
    }
    addParams(route, action.Params)
    routes = append(routes, route)

# HTTP Server 的启动

  • Server 内生命周期状态流转 preparedGenericAPIServer.Run()
  • preParedGenericAPIService.NonBlockingRun

# Open API 与 Generic Server

# Open API

Open API 是什么? (opens new window)

Open API 规范 (opens new window)

Open API 3.0 博客 (opens new window)

Open API:是由 swagger 发展而来的一种规范,一种形式化描述 restful service 的“语言”,便于客户端理解和使用的一个 Service。通过 Open API,可以描述一个服务:

  • 提供哪些 restful
  • 提供服务接受的输入以及输出的对象格式
  • 支持的操作,如 get、post、update、delete 等
上次更新: 2023/02/15, 03:43:27
1. API Server 源码简介
3. Master API Server

← 1. API Server 源码简介 3. Master API Server→

最近更新
01
概述
02-10
02
(四)Aggregated API Server 的实现
02-10
03
8. 登录与鉴权的实现
02-09
更多文章>
友站: www.brook-w.com
Theme by Vdoing | Copyright © 2019-2023 Brook-w | GPL License
京ICP备2020045721号-2
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式