程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes APIServer-整体源码

发表于 2024-01-16 | 分类于 Kubernetes | 0 | 阅读次数 4459

API Server 中的三个 Server

Kubernetes 原生 API Server 包含了三个 Server:

  • Aggregator Server,拦截 APIService 中定义的资源类型的请求,将其转发给指定的 Aggregator APIServer 处理。

    APIService 定义了 Aggregator Server 上要添加的路由信息,该路由信息的大致意思就是将对某个资源的访问路由至对应的 Aggregator Server。如下所示,该例子表示在 Aggregator上注册了 auth.ilinux.io/v2beta1 这个端点,该端点对应的后端 APIServer 的 service 是 default 命名空间下的 auth-api,即客户端访问 auth.ilinux.io/v2beta1 下的资源都会被路由至 default 命名空间下的 auth-api service 进行响应。

    apiVersion: apiregistration.k8s.io/v1
    kind: APIService
    metadata:
      name: v2beta1.auth.ilinux.io
    spec:
      insecureSkipTLSVerify: true	# 是否忽略安全验证,即不验证 https 证书;true 表示不验证,false 表示要验证
      group: auth.ilinux.io				# 资源类型所属 group
      version: v2beta1						# 资源类型所属 group 的版本
      groupPriorityMinimum: 1000	# group 的优先级
      versionPriority: 15					# group 版本的优先级
      service:										# 对应 group 及其版本中资源的请求路由的后端 APIServer 关联的 service
        name: auth-api
        namespace: default
    
  • Kube APIServer,用于处理 Kubernetes 的内建核心资源,如:Deployment,ConfigMap 等。

  • API Extension Server,负责处理用户自定义资源类型。

请求流程

一个请求在三个组件中处理的流程如图所示:

  • 用户请求首先到达 Aggregator Server,Aggregator Server 根据用户请求的资源,以及 APIService 中定义的路由信息,先考虑是否要将请求转发给路由中定义的后端 service,也就是是否要转发给相应的 Aggregator APIServer。如果不考虑将该请求转发给 Aggregator APIServer 的话,则将请求 delegate 给 Kube APIServer 进行处理。
  • 请求来到 Kube APIServer。对于内建 API 资源的请求,Kube APIServer 会进行处理,而对于非内建 API 资源的请求,则将该请求继续 delegate 给 API Extension Server。
  • 请求来到 API Extension Server,对于 API Extension Server 不能处理的请求来说,它会将请求 delegate 给 notFoundHandler。

API Server 整体源码

API Server 的本质是一个实现了 RESTful API 的 WebServer,它使用了 go-restful 框架,但是主要是使用了 go-restful 框架中的路由功能。因此,API Server 的启动过程中,主要的工作就是构建 handler,并依次注册各种 handler。这样当收到请求之后,会依次顺序调用注册的 handler 对请求进行处理。

三个 Server 的整体流程

整体的流程也还是参考这张图,但是在代码实现上,它是从下往上实现的。也就相当于只有先构建了 delegate target,才能在此基础之上继续构建新的 server。delegate 的中文意思是授权、委托,针对上述流程来说,也就是 Aggregator Server 无法处理的请求,则将请求委托给 Kube APIServer 进行处理。

相关代码实现如下所示,

  • 可以看到依次创建了 apiExtensionsServer、kubeAPIServer 和 aggregatorServer,并通过 delegation 的方式将它们串起来。
// cmd/kube-apiserver/app/server.go
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
	notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
	
  apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
	if err != nil {
		return nil, err
	}
	crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

	kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
	if err != nil {
		return nil, err
	}

	// aggregator comes last in the chain
	aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
	if err != nil {
		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
		return nil, err
	}

	return aggregatorServer, nil
}

下面以 createAggregatorServer 为例,其他也是类似的。

  • createAggregatorServer() 方法会调用 NewWithDelegate() 方法。
  • NewWithDelegate() 方法会调用 c.GenericConfig.New("kube-aggregator", delegationTarget),创建 Aggregator Server,同时将 kubeAPIServer 作为自己的 deletgate 对象。在调用 createAggregatorServer() 方法的时候,传入了 kubeAPIServer.GenericAPIServer,通过参数名 delegateAPIServer 可以判断传入的 kubeAPIServer 将作为 aggregatorServer 的 deletgate 对象。
// cmd/kube-apiserver/app/aggregator.go
func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
	aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
  ...
}

// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
	genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
  ...
}

Handler 的整体流程

如下所示,是一个请求在 API Server 中的详细处理流程。会有个问题,下述通用处理流程对应的 handler 是每个 Server 都包含,还是在最开始的 Server 中包含(也就是 Aggregator Server 中才有)?

在上述的基础之上,继续跟踪 c.GenericConfig.New 的函数调用链,也就是创建 Aggregator Server 的详细过程。可以看到 New 函数会调用 NewAPIServerHandler() 函数,在这个函数中会用到 go-restful 库,

  • 首先创建 go-restful 中的 container 对象。container 是一个 HTTP 处理器,负责监听和处理来自 HTTP 客户端的请求,并对请求进行路由,调用该路由对应的 handler。通过使用不同的 container,开发者可以在一个应用程序中创建多个独立的 HTTP 服务器,每个服务器监听不同的端口,并且有不同的服务集合(WebService),也就是相当于可以有不同的路由等。
  • 然后创建对应的 APIServerHandler 对象。
// vendor/k8s.io/apiserver/pkg/server/config.go
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
	...

	apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
  ...
}

// vendor/k8s.io/apiserver/pkg/server/handler.go
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
	nonGoRestfulMux := mux.NewPathRecorderMux(name)
	if notFoundHandler != nil {
		nonGoRestfulMux.NotFoundHandler(notFoundHandler)
	}

	gorestfulContainer := restful.NewContainer()
	gorestfulContainer.ServeMux = http.NewServeMux()
	gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
	gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
		logStackOnRecover(s, panicReason, httpWriter)
	})
	gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
		serviceErrorHandler(s, serviceErr, request, response)
	})

	director := director{
		name:               name,
		goRestfulContainer: gorestfulContainer,
		nonGoRestfulMux:    nonGoRestfulMux,
	}

	return &APIServerHandler{
		FullHandlerChain:   handlerChainBuilder(director),
		GoRestfulContainer: gorestfulContainer,
		NonGoRestfulMux:    nonGoRestfulMux,
		Director:           director,
	}
}

APIServerHandler 对象实现了 ServeHTTP() 方法,是一个 Handler。在这个 Handler 中首先调用了 FullHandlerChain 的 ServeHTTP() 方法。

func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	a.FullHandlerChain.ServeHTTP(w, r)
}

FullHandlerChain 由 handlerChainBuilder() 函数创建,而 handlerChainBuilder() 其实就是 DefaultBuildHandlerChain() 函数。因此如果调用 FullHandlerChain 的 ServeHTTP() 方法,那么会从下往上依次调用 DefaultBuildHandlerChain() 中的 handler,可以看到相应功能 hanlder 调用的顺序符合上图的顺序。而最终调用的 apiHandler 其实就是传入的 director。

// vendor/k8s.io/apiserver/pkg/server/config.go
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := filterlatency.TrackCompleted(apiHandler)
	handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "authorization")

	if c.FlowControl != nil {
		requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount)
		handler = filterlatency.TrackCompleted(handler)
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
		handler = filterlatency.TrackStarted(handler, "priorityandfairness")
	} else {
		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	}

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "impersonation")

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
	handler = filterlatency.TrackStarted(handler, "audit")

	failedHandler := genericapifilters.Unauthorized(c.Serializer)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)

	failedHandler = filterlatency.TrackCompleted(failedHandler)
	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = filterlatency.TrackStarted(handler, "authentication")

	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")

	// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
	// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)

	handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
		c.LongRunningFunc, c.Serializer, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
	}
	handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
	handler = genericapifilters.WithWarningRecorder(handler)
	handler = genericapifilters.WithCacheControl(handler)
	handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
	if c.ShutdownSendRetryAfter {
		handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
	}
	handler = genericfilters.WithHTTPLogging(handler)
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
		handler = genericapifilters.WithTracing(handler, c.TracerProvider)
	}
	handler = genericapifilters.WithLatencyTrackers(handler)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithRequestReceivedTimestamp(handler)
	handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
	handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithAuditID(handler)
	return handler
}

接下去继续看 director 的 ServeHTTP() 方法,如下所示,

  • 调用 goRestfulContainer 对路由进行处理。而 goRestfulContainer 通过上面的代码可以知道它是 Aggregator Server 的 HTTP 处理器。
  • 如果 goRestfulContainer 无法进行处理,则会调用 nonGoRestfulMux 的 ServeHTTP() 方法进行处理。
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	path := req.URL.Path

	for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
				...
				d.goRestfulContainer.Dispatch(w, req)
				return
			}
		}
	}

	// if we didn't find a match, then we just skip gorestful altogether
	klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
	d.nonGoRestfulMux.ServeHTTP(w, req)
}

接下去继续看 nonGoRestfulMux,通过上述的代码可以看到 nonGoRestfulMux 是对 notFoundHandler 的包装,它最终是调用 notFoundHandler 的 ServerHTTP() 方法。

func (m *PathRecorderMux) NotFoundHandler(notFoundHandler http.Handler) {
	...
	m.notFoundHandler = notFoundHandler
}

func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}

func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	...
	h.notFoundHandler.ServeHTTP(w, r)
}

那么,notFoundHandler 是什么?通过上述的代码,可以看到 New() 函数传的是 delegationTarget.UnprotectedHandler(),如下所示,

  • 返回 delegationTarget 的 Director。Aggregator Server 的 delegationTarget 是 Kube APIServer,那么这个 director 就是 Kube APIServer 的 director。
  • 再根据上述 Aggregator Server 中 Director 的处理逻辑,Kube APIServer 的 director 的处理逻辑也是类似的,也就是先调用 Kube APIServer 自己的 GoRestfulContainer HTTP 处理器进行处理。
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
	return s.Handler.Director
}

以此类推,可以分析得到,一个请求在 API Server 中处理的顺序为,

  1. 通用的 Handler(鉴权、认证、限流等)。
  2. Aggregator Server 的 HTTP 处理器进行处理。
  3. Kube APIServer 的 HTTP 处理器进行处理。
  4. Extension Server 的 HTTP 处理器进行处理。
  5. 真正的 notfound。

现在可以回答一下上面的问题了,在整个请求的处理过程中,鉴权、认证、限流等流程会在开始的时候进行处理,它是在 Aggregator Server 之前被调用,并且只调用这么一次。

Kubernetes 内建核心资源的 Handler 注册

Kubernetes 内建核心资源是由 Kube APIServer 进行处理。因此,我们从 Kube APIServer 创建的时候开始看,如下所示,

func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
	...
	kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
	...
}

New() 函数中关于核心资源 Handler 注册的流程如下所示,

  • 首先创建一系列 Provider,比如 legacyRESTStorageProvider、noderest.RESTStorageProvider 等。从名字可以看到,这些都是核心资源的名字(看到了希望)。
  • 之后调用 InstallAPIs() 方法基于 Provider 进行注册。
// pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
	s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
	if err != nil {
		return nil, err
	}
	...
	legacyRESTStorageProvider, err := corerest.New(corerest.Config{
		GenericConfig: corerest.GenericConfig{
			StorageFactory:              c.ExtraConfig.StorageFactory,
			EventTTL:                    c.ExtraConfig.EventTTL,
			LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
			ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
			ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
			ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
			APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
			Informers:                   c.ExtraConfig.VersionedInformers,
		},
		Proxy: corerest.ProxyConfig{
			Transport:           c.ExtraConfig.ProxyTransport,
			KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
		},
		Services: corerest.ServicesConfig{
			ClusterIPRange:          c.ExtraConfig.ServiceIPRange,
			SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
			NodePortRange:           c.ExtraConfig.ServiceNodePortRange,
			IPRepairInterval:        c.ExtraConfig.RepairServicesInterval,
		},
	})

	restStorageProviders := []RESTStorageProvider{
		legacyRESTStorageProvider,
		apiserverinternalrest.StorageProvider{},
		authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
		authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
		autoscalingrest.RESTStorageProvider{},
		batchrest.RESTStorageProvider{},
		certificatesrest.RESTStorageProvider{},
		coordinationrest.RESTStorageProvider{},
		discoveryrest.StorageProvider{},
		networkingrest.RESTStorageProvider{},
		noderest.RESTStorageProvider{},
		policyrest.RESTStorageProvider{},
		rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
		schedulingrest.RESTStorageProvider{},
		storagerest.RESTStorageProvider{},
		flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
		appsrest.StorageProvider{},
		admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},
		eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
		resourcerest.RESTStorageProvider{},
	}
	if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
		return nil, err
	}
  ...
}

InstallAPIs() 方法会对上述创建的 Provider 进行遍历,在遍历的时候,

  • 首先调用相应资源类型对应的 NewRESTStorage() 方法创建相应的 RESTStorage。RESTStorage 的作用就是将每种资源的访问路径及其后端存储的操作对应起来,Kube APIServer 会为每种资源都创建对应的 RESTStorage。
  • 之后针对 core 组中的资源,会调用 InstallLegacyAPIGroup() 方法进行注册;针对非 core 组中的资源,会调用 InstallAPIGroups() 方法进行注册。这两个方法的主要作用就是为每种资源添加相应的操作(Get/Update/Delete 等)和路由,并将其与创建的 Handler 映射起来,从而实现对外提供 RESTful API。
// pkg/controlplane/instance.go
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
	nonLegacy := []*genericapiserver.APIGroupInfo{}
	for _, restStorageBuilder := range restStorageProviders {
		groupName := restStorageBuilder.GroupName()
		apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)


		if len(groupName) == 0 {
			if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
				return fmt.Errorf("error in registering legacy API: %w", err)
			}
		} else {
			nonLegacy = append(nonLegacy, &apiGroupInfo)
		}
	}

	if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
		return fmt.Errorf("error in registering group versions: %v", err)
	}
	return nil
}

const (
	// DefaultLegacyAPIPrefix is where the legacy APIs will be located.
	DefaultLegacyAPIPrefix = "/api"

	// APIGroupPrefix is where non-legacy API group will be located.
	APIGroupPrefix = "/apis"
)

接下去以 core 组中的资源注册来分别跟踪 NewRESTStorage() 方法和 InstallLegacyAPIGroup() 方法,其他资源的注册也是类似的。

NewRESTStorage

NewRESTStorage() 方法中,

  • 首先创建相应资源的 Storage,比如 PodStorage。
  • 然后针对相应的路径,设置对应的 rest.Storage 对象。
func (c *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
	...
	podStorage, err := podstore.NewStorage(
		restOptionsGetter,
		nodeStorage.KubeletConnectionInfo,
		c.Proxy.Transport,
		podDisruptionClient,
	)
  ...
  storage := apiGroupInfo.VersionedResourcesStorageMap["v1"]
	if storage == nil {
		storage = map[string]rest.Storage{}
	}
 	...
	if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
		storage[resource] = podStorage.Pod
		storage[resource+"/attach"] = podStorage.Attach
		storage[resource+"/status"] = podStorage.Status
		storage[resource+"/log"] = podStorage.Log
		storage[resource+"/exec"] = podStorage.Exec
		storage[resource+"/portforward"] = podStorage.PortForward
		storage[resource+"/proxy"] = podStorage.Proxy
		storage[resource+"/binding"] = podStorage.Binding
		if podStorage.Eviction != nil {
			storage[resource+"/eviction"] = podStorage.Eviction
		}
		storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
	}
	...

	if len(storage) > 0 {
		apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
	}

	return apiGroupInfo, nil
}

以 Pod 资源为例,先来看下创建 PodStorage 的过程。

  • 可以看到 PodStorage 中 rest.Storage 对象,比如 PodStorage.Pod、PodStorage.Binding 这些都封装了 store,使用的是同一个 store,因此核心的是 store 这个对象。
  • store 首先是创建了个结构体,然后调用 CompleteWithOptions() 方法根据 options 的配置,完善 store 中的内容。其中最主要的是 e.Storage.Storage 的创建,继续跟踪 opts.Decorator() 其实是会创建了一个 Cacher 对象。这个 Cacher 对象是跟 etcd 进行交互的,它会将 etcd 中跟 Pod 相关的资源都缓存到 Cacher 中。类似的,其他每种资源类型都会创建一个 Cacher 对象。
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {

	store := &genericregistry.Store{
		...
	}
	options := &generic.StoreOptions{
		RESTOptions: optsGetter,
		AttrFunc:    registrypod.GetAttrs,
		TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
		Indexers:    registrypod.Indexers(),
	}
	if err := store.CompleteWithOptions(options); err != nil {
		return PodStorage{}, err
	}

	statusStore := *store
	statusStore.UpdateStrategy = registrypod.StatusStrategy
	statusStore.ResetFieldsStrategy = registrypod.StatusStrategy
	ephemeralContainersStore := *store
	ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy

	bindingREST := &BindingREST{store: store}
	return PodStorage{
		Pod:                 &REST{store, proxyTransport},
		Binding:             &BindingREST{store: store},
		LegacyBinding:       &LegacyBindingREST{bindingREST},
		Eviction:            newEvictionStorage(&statusStore, podDisruptionBudgetClient),
		Status:              &StatusREST{store: &statusStore},
		EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
		Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
		Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
		Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
		Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
		PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
	}, nil
}

func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
	...
	if e.Storage.Storage == nil {
		...
		e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
			opts.StorageConfig,
			prefix,
			keyFunc,
			e.NewFunc,
			e.NewListFunc,
			attrFunc,
			options.TriggerFunc,
			options.Indexers,
		)
		...
	}

	return nil
}

// vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go
type Cacher struct {
  ...
	watchCache *watchCache
	reflector  *cache.Reflector	
}

接着来看下相应路径设置的 rest.Storage 对象,以 storage[resource] = podStorage.Pod 为例,

  • podStorage.Pod 对象继承了 *genericregistry.Store,也就继承了 genericregistry.Store 的方法。
  • genericregistry.Store 提供了 Get、Update 等方法,而这些方法最终都是会访问 Etcd 的。针对使用了 Cacher 来说,读取的时候会优先访问 Cacher。
// pkg/registry/core/pod/storage/storage.go
type REST struct {
	*genericregistry.Store
	proxyTransport http.RoundTripper
}

// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	
	if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
		return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
	}
	...
	return obj, nil
}
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
  ...
}

综上来看,NewRESTStorage() 方法的最终目的是建立了路径与对应 rest.Storage 对象之间的关系,这些 rest.Storage 对象使用的都是同一个 store。比如

storage["pods"] = podStorage.Pod = &REST{store, proxyTransport}
storage["pods"+"/attach"] = podStorage.Attach = &podrest.AttachREST{Store: store, KubeletConn: k}
storage["pods"+"/status"] = podStorage.Status = &StatusREST{store: &statusStore}

InstallLegacyAPIGroup

InstallLegacyAPIGroup 的主要作用就是在 RESTStorage 的基础之上,创建相应的路由并添加相应的 Handler,最终将这些内容注册到 Kube APIServer 的 GoRestfulContainer 中。可以看到每个 groupVersion 都对应一个 webservice。

func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	...
	if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
		return err
	}
	...
	return nil
}

func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, typeCo}}nverter managedfields.TypeConverter) error {
	var resourceInfos []*storageversion.ResourceInfo
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
		...
		apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
		...
		discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
		...
  }
}

func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}

	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	...
	container.Add(ws)
	...
}

func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
  ...
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		...
	}
	return apiResources, resourceInfos, ws, errors
}

基于上述 Pod 的例子,这里给 registerResourceHandlers 传的内容如下所示,

registerResourceHandlers("pods", podStorage.Pod, ws)

由于 registerResourceHandlers() 方法比较复杂,基于上述具体的例子,进行简单讲解,

  • 首先是对传入的 podStorage.Pod 进行类型断言,就是看是否实现了 rest.Getter 接口。
  • 之后是创建对应的 action,其实就是包含了接下来要创建的路由和 handler 所需的操作信息,每一个 action 对应一个标准的 REST method。
  • 接着是执行上述创建的 action,首先是生成 handler,然后将 route 和 handler 进行绑定。
  • 最后是将 route 添加到 webservice 中。

以此类推,完成了针对 pod 资源的所有路由的注册。

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
  resource, subresource, err := splitSubresource(path)

  getter, isGetter := storage.(rest.Getter)
  switch {
    ...
  default:
    itemPath := namespacedPath + "/{name}"
    actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
  }
  
  for _, action := range actions {
    ...
    switch action.Verb {
		case "GET": // Get a resource.
      var handler restful.RouteFunction
			if isGetterWithOptions {
				handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
			} else {
				handler = restfulGetResource(getter, reqScope)
			}

			if needOverride {
				handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
			} else {
				handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
			}
			handler = utilwarning.AddWarningsHandler(handler, warnings)
      
      route := ws.GET(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
      routes = append(routes, route)
    	...  
    }
    
    for _, route := range routes {
			...
			ws.Route(route)
		}
    
    
  }
}

总结

  • Kube APIServer 会为每种资源类型都构建相应的 Cacher,Cacher 通过 Etcd 的 watch 机制保持与 Etcd 中的数据一致。可以理解为每种资源的请求处理都是独立的流程,只是这些流程都是类似的。

相关资料

Kubernetes API Server handler 注册过程分析:https://cloudnative.to/blog/apiserver-handler-register/

kube-apiserver-处理流程:https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#kube-apiserver-%E5%A4%84%E7%90%86%E6%B5%81%E7%A8%8B

kubernetes apiserver原理概览:https://github.com/duyanghao/kubernetes-reading-notes/blob/master/core/api-server/overview.md#kubernetes-apiserver%E5%8E%9F%E7%90%86%E6%A6%82%E8%A7%88

API Server 代码走读:https://cncamp.notion.site/kube-apiserver-10d5695cbbb14387b60c6d622005583d

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/251
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes Kubelet-整体流程
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%