API Server 中的三个 Server
Kubernetes 原生 API Server 包含了三个 Server:
-
Aggregator Server,拦截 APIService 中定义的资源类型的请求,将其转发给指定的
Aggregator APIServer
处理。APIService 定义了 Aggregator Server 上要添加的路由信息,该路由信息的大致意思就是将对某个资源的访问路由至对应的 Aggregator Server。如下所示,该例子表示在 Aggregator上注册了 auth.ilinux.io/v2beta1 这个端点,该端点对应的后端 APIServer 的 service 是 default 命名空间下的 auth-api,即客户端访问 auth.ilinux.io/v2beta1 下的资源都会被路由至 default 命名空间下的 auth-api service 进行响应。
apiVersion: apiregistration.k8s.io/v1 kind: APIService metadata: name: v2beta1.auth.ilinux.io spec: insecureSkipTLSVerify: true # 是否忽略安全验证,即不验证 https 证书;true 表示不验证,false 表示要验证 group: auth.ilinux.io # 资源类型所属 group version: v2beta1 # 资源类型所属 group 的版本 groupPriorityMinimum: 1000 # group 的优先级 versionPriority: 15 # group 版本的优先级 service: # 对应 group 及其版本中资源的请求路由的后端 APIServer 关联的 service name: auth-api namespace: default
-
Kube APIServer,用于处理 Kubernetes 的内建核心资源,如:Deployment,ConfigMap 等。
-
API Extension Server,负责处理用户自定义资源类型。
请求流程
一个请求在三个组件中处理的流程如图所示:
- 用户请求首先到达 Aggregator Server,Aggregator Server 根据用户请求的资源,以及 APIService 中定义的路由信息,先考虑是否要将请求转发给路由中定义的后端 service,也就是是否要转发给相应的 Aggregator APIServer。如果不考虑将该请求转发给 Aggregator APIServer 的话,则将请求 delegate 给 Kube APIServer 进行处理。
- 请求来到 Kube APIServer。对于内建 API 资源的请求,Kube APIServer 会进行处理,而对于非内建 API 资源的请求,则将该请求继续 delegate 给 API Extension Server。
- 请求来到 API Extension Server,对于 API Extension Server 不能处理的请求来说,它会将请求 delegate 给 notFoundHandler。
API Server 整体源码
API Server 的本质是一个实现了 RESTful API 的 WebServer,它使用了 go-restful 框架,但是主要是使用了 go-restful 框架中的路由功能。因此,API Server 的启动过程中,主要的工作就是构建 handler,并依次注册各种 handler。这样当收到请求之后,会依次顺序调用注册的 handler 对请求进行处理。
三个 Server 的整体流程
整体的流程也还是参考这张图,但是在代码实现上,它是从下往上实现的。也就相当于只有先构建了 delegate target,才能在此基础之上继续构建新的 server。delegate 的中文意思是授权、委托,针对上述流程来说,也就是 Aggregator Server 无法处理的请求,则将请求委托给 Kube APIServer 进行处理。
相关代码实现如下所示,
- 可以看到依次创建了 apiExtensionsServer、kubeAPIServer 和 aggregatorServer,并通过 delegation 的方式将它们串起来。
// cmd/kube-apiserver/app/server.go
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
return aggregatorServer, nil
}
下面以 createAggregatorServer 为例,其他也是类似的。
- createAggregatorServer() 方法会调用 NewWithDelegate() 方法。
- NewWithDelegate() 方法会调用
c.GenericConfig.New("kube-aggregator", delegationTarget)
,创建 Aggregator Server,同时将 kubeAPIServer 作为自己的 deletgate 对象。在调用 createAggregatorServer() 方法的时候,传入了 kubeAPIServer.GenericAPIServer,通过参数名 delegateAPIServer 可以判断传入的 kubeAPIServer 将作为 aggregatorServer 的 deletgate 对象。
// cmd/kube-apiserver/app/aggregator.go
func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
...
}
// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
...
}
Handler 的整体流程
如下所示,是一个请求在 API Server 中的详细处理流程。会有个问题,下述通用处理流程对应的 handler 是每个 Server 都包含,还是在最开始的 Server 中包含(也就是 Aggregator Server 中才有)?
在上述的基础之上,继续跟踪 c.GenericConfig.New
的函数调用链,也就是创建 Aggregator Server 的详细过程。可以看到 New 函数会调用 NewAPIServerHandler() 函数,在这个函数中会用到 go-restful 库,
- 首先创建 go-restful 中的 container 对象。container 是一个 HTTP 处理器,负责监听和处理来自 HTTP 客户端的请求,并对请求进行路由,调用该路由对应的 handler。通过使用不同的 container,开发者可以在一个应用程序中创建多个独立的 HTTP 服务器,每个服务器监听不同的端口,并且有不同的服务集合(WebService),也就是相当于可以有不同的路由等。
- 然后创建对应的 APIServerHandler 对象。
// vendor/k8s.io/apiserver/pkg/server/config.go
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
...
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
...
}
// vendor/k8s.io/apiserver/pkg/server/handler.go
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
APIServerHandler 对象实现了 ServeHTTP() 方法,是一个 Handler。在这个 Handler 中首先调用了 FullHandlerChain 的 ServeHTTP() 方法。
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.FullHandlerChain.ServeHTTP(w, r)
}
FullHandlerChain 由 handlerChainBuilder() 函数创建,而 handlerChainBuilder() 其实就是 DefaultBuildHandlerChain() 函数。因此如果调用 FullHandlerChain 的 ServeHTTP() 方法,那么会从下往上依次调用 DefaultBuildHandlerChain() 中的 handler,可以看到相应功能 hanlder 调用的顺序符合上图的顺序。而最终调用的 apiHandler 其实就是传入的 director。
// vendor/k8s.io/apiserver/pkg/server/config.go
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "authorization")
if c.FlowControl != nil {
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, "impersonation")
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, "audit")
failedHandler := genericapifilters.Unauthorized(c.Serializer)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
failedHandler = filterlatency.TrackCompleted(failedHandler)
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = filterlatency.TrackStarted(handler, "authentication")
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator)
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
if c.ShutdownSendRetryAfter {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.AfterShutdownDelayDuration.Signaled())
}
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
handler = genericapifilters.WithLatencyTrackers(handler)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericapifilters.WithMuxAndDiscoveryComplete(handler, c.lifecycleSignals.MuxAndDiscoveryComplete.Signaled())
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
handler = genericapifilters.WithAuditID(handler)
return handler
}
接下去继续看 director 的 ServeHTTP() 方法,如下所示,
- 调用 goRestfulContainer 对路由进行处理。而 goRestfulContainer 通过上面的代码可以知道它是 Aggregator Server 的 HTTP 处理器。
- 如果 goRestfulContainer 无法进行处理,则会调用 nonGoRestfulMux 的 ServeHTTP() 方法进行处理。
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
...
d.goRestfulContainer.Dispatch(w, req)
return
}
}
}
// if we didn't find a match, then we just skip gorestful altogether
klog.V(5).Infof("%v: %v %q satisfied by nonGoRestful", d.name, req.Method, path)
d.nonGoRestfulMux.ServeHTTP(w, req)
}
接下去继续看 nonGoRestfulMux,通过上述的代码可以看到 nonGoRestfulMux 是对 notFoundHandler 的包装,它最终是调用 notFoundHandler 的 ServerHTTP() 方法。
func (m *PathRecorderMux) NotFoundHandler(notFoundHandler http.Handler) {
...
m.notFoundHandler = notFoundHandler
}
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
...
h.notFoundHandler.ServeHTTP(w, r)
}
那么,notFoundHandler 是什么?通过上述的代码,可以看到 New() 函数传的是 delegationTarget.UnprotectedHandler()
,如下所示,
- 返回 delegationTarget 的 Director。Aggregator Server 的 delegationTarget 是 Kube APIServer,那么这个 director 就是 Kube APIServer 的 director。
- 再根据上述 Aggregator Server 中 Director 的处理逻辑,Kube APIServer 的 director 的处理逻辑也是类似的,也就是先调用 Kube APIServer 自己的 GoRestfulContainer HTTP 处理器进行处理。
func (s *GenericAPIServer) UnprotectedHandler() http.Handler {
return s.Handler.Director
}
以此类推,可以分析得到,一个请求在 API Server 中处理的顺序为,
- 通用的 Handler(鉴权、认证、限流等)。
- Aggregator Server 的 HTTP 处理器进行处理。
- Kube APIServer 的 HTTP 处理器进行处理。
- Extension Server 的 HTTP 处理器进行处理。
- 真正的 notfound。
现在可以回答一下上面的问题了,在整个请求的处理过程中,鉴权、认证、限流等流程会在开始的时候进行处理,它是在 Aggregator Server 之前被调用,并且只调用这么一次。
Kubernetes 内建核心资源的 Handler 注册
Kubernetes 内建核心资源是由 Kube APIServer 进行处理。因此,我们从 Kube APIServer 创建的时候开始看,如下所示,
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
...
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
...
}
New() 函数中关于核心资源 Handler 注册的流程如下所示,
- 首先创建一系列 Provider,比如 legacyRESTStorageProvider、noderest.RESTStorageProvider 等。从名字可以看到,这些都是核心资源的名字(看到了希望)。
- 之后调用 InstallAPIs() 方法基于 Provider 进行注册。
// pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
...
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: corerest.GenericConfig{
StorageFactory: c.ExtraConfig.StorageFactory,
EventTTL: c.ExtraConfig.EventTTL,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
Informers: c.ExtraConfig.VersionedInformers,
},
Proxy: corerest.ProxyConfig{
Transport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
},
Services: corerest.ServicesConfig{
ClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
NodePortRange: c.ExtraConfig.ServiceNodePortRange,
IPRepairInterval: c.ExtraConfig.RepairServicesInterval,
},
})
restStorageProviders := []RESTStorageProvider{
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
discoveryrest.StorageProvider{},
networkingrest.RESTStorageProvider{},
noderest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
appsrest.StorageProvider{},
admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
resourcerest.RESTStorageProvider{},
}
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
return nil, err
}
...
}
InstallAPIs() 方法会对上述创建的 Provider 进行遍历,在遍历的时候,
- 首先调用相应资源类型对应的 NewRESTStorage() 方法创建相应的 RESTStorage。RESTStorage 的作用就是将每种资源的访问路径及其后端存储的操作对应起来,Kube APIServer 会为每种资源都创建对应的 RESTStorage。
- 之后针对 core 组中的资源,会调用 InstallLegacyAPIGroup() 方法进行注册;针对非 core 组中的资源,会调用 InstallAPIGroups() 方法进行注册。这两个方法的主要作用就是为每种资源添加相应的操作(Get/Update/Delete 等)和路由,并将其与创建的 Handler 映射起来,从而实现对外提供 RESTful API。
// pkg/controlplane/instance.go
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
nonLegacy := []*genericapiserver.APIGroupInfo{}
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if len(groupName) == 0 {
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}
if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
const (
// DefaultLegacyAPIPrefix is where the legacy APIs will be located.
DefaultLegacyAPIPrefix = "/api"
// APIGroupPrefix is where non-legacy API group will be located.
APIGroupPrefix = "/apis"
)
接下去以 core 组中的资源注册来分别跟踪 NewRESTStorage() 方法和 InstallLegacyAPIGroup() 方法,其他资源的注册也是类似的。
NewRESTStorage
NewRESTStorage() 方法中,
- 首先创建相应资源的 Storage,比如 PodStorage。
- 然后针对相应的路径,设置对应的 rest.Storage 对象。
func (c *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
...
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.Proxy.Transport,
podDisruptionClient,
)
...
storage := apiGroupInfo.VersionedResourcesStorageMap["v1"]
if storage == nil {
storage = map[string]rest.Storage{}
}
...
if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
storage[resource] = podStorage.Pod
storage[resource+"/attach"] = podStorage.Attach
storage[resource+"/status"] = podStorage.Status
storage[resource+"/log"] = podStorage.Log
storage[resource+"/exec"] = podStorage.Exec
storage[resource+"/portforward"] = podStorage.PortForward
storage[resource+"/proxy"] = podStorage.Proxy
storage[resource+"/binding"] = podStorage.Binding
if podStorage.Eviction != nil {
storage[resource+"/eviction"] = podStorage.Eviction
}
storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers
}
...
if len(storage) > 0 {
apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
}
return apiGroupInfo, nil
}
以 Pod 资源为例,先来看下创建 PodStorage 的过程。
- 可以看到 PodStorage 中 rest.Storage 对象,比如 PodStorage.Pod、PodStorage.Binding 这些都封装了 store,使用的是同一个 store,因此核心的是 store 这个对象。
- store 首先是创建了个结构体,然后调用 CompleteWithOptions() 方法根据 options 的配置,完善 store 中的内容。其中最主要的是 e.Storage.Storage 的创建,继续跟踪 opts.Decorator() 其实是会创建了一个 Cacher 对象。这个 Cacher 对象是跟 etcd 进行交互的,它会将 etcd 中跟 Pod 相关的资源都缓存到 Cacher 中。类似的,其他每种资源类型都会创建一个 Cacher 对象。
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {
store := &genericregistry.Store{
...
}
options := &generic.StoreOptions{
RESTOptions: optsGetter,
AttrFunc: registrypod.GetAttrs,
TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
Indexers: registrypod.Indexers(),
}
if err := store.CompleteWithOptions(options); err != nil {
return PodStorage{}, err
}
statusStore := *store
statusStore.UpdateStrategy = registrypod.StatusStrategy
statusStore.ResetFieldsStrategy = registrypod.StatusStrategy
ephemeralContainersStore := *store
ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy
bindingREST := &BindingREST{store: store}
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
LegacyBinding: &LegacyBindingREST{bindingREST},
Eviction: newEvictionStorage(&statusStore, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}, nil
}
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
...
if e.Storage.Storage == nil {
...
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
opts.StorageConfig,
prefix,
keyFunc,
e.NewFunc,
e.NewListFunc,
attrFunc,
options.TriggerFunc,
options.Indexers,
)
...
}
return nil
}
// vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go
type Cacher struct {
...
watchCache *watchCache
reflector *cache.Reflector
}
接着来看下相应路径设置的 rest.Storage 对象,以 storage[resource] = podStorage.Pod
为例,
- podStorage.Pod 对象继承了 *genericregistry.Store,也就继承了 genericregistry.Store 的方法。
- genericregistry.Store 提供了 Get、Update 等方法,而这些方法最终都是会访问 Etcd 的。针对使用了 Cacher 来说,读取的时候会优先访问 Cacher。
// pkg/registry/core/pod/storage/storage.go
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
// vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
}
...
return obj, nil
}
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
...
}
综上来看,NewRESTStorage() 方法的最终目的是建立了路径与对应 rest.Storage 对象之间的关系,这些 rest.Storage 对象使用的都是同一个 store。比如
storage["pods"] = podStorage.Pod = &REST{store, proxyTransport}
storage["pods"+"/attach"] = podStorage.Attach = &podrest.AttachREST{Store: store, KubeletConn: k}
storage["pods"+"/status"] = podStorage.Status = &StatusREST{store: &statusStore}
InstallLegacyAPIGroup
InstallLegacyAPIGroup 的主要作用就是在 RESTStorage 的基础之上,创建相应的路由并添加相应的 Handler,最终将这些内容注册到 Kube APIServer 的 GoRestfulContainer 中。可以看到每个 groupVersion 都对应一个 webservice。
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
...
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
return err
}
...
return nil
}
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, typeCo}}nverter managedfields.TypeConverter) error {
var resourceInfos []*storageversion.ResourceInfo
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
...
apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
...
discoveryAPIResources, r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
...
}
}
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]apidiscoveryv2beta1.APIResourceDiscovery, []*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
...
container.Add(ws)
...
}
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
...
paths := make([]string, len(a.group.Storage))
var i int = 0
for path := range a.group.Storage {
paths[i] = path
i++
}
sort.Strings(paths)
for _, path := range paths {
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
...
}
return apiResources, resourceInfos, ws, errors
}
基于上述 Pod 的例子,这里给 registerResourceHandlers 传的内容如下所示,
registerResourceHandlers("pods", podStorage.Pod, ws)
由于 registerResourceHandlers() 方法比较复杂,基于上述具体的例子,进行简单讲解,
- 首先是对传入的 podStorage.Pod 进行类型断言,就是看是否实现了 rest.Getter 接口。
- 之后是创建对应的 action,其实就是包含了接下来要创建的路由和 handler 所需的操作信息,每一个 action 对应一个标准的 REST method。
- 接着是执行上述创建的 action,首先是生成 handler,然后将 route 和 handler 进行绑定。
- 最后是将 route 添加到 webservice 中。
以此类推,完成了针对 pod 资源的所有路由的注册。
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
resource, subresource, err := splitSubresource(path)
getter, isGetter := storage.(rest.Getter)
switch {
...
default:
itemPath := namespacedPath + "/{name}"
actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
}
for _, action := range actions {
...
switch action.Verb {
case "GET": // Get a resource.
var handler restful.RouteFunction
if isGetterWithOptions {
handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
} else {
handler = restfulGetResource(getter, reqScope)
}
if needOverride {
handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
} else {
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
}
handler = utilwarning.AddWarningsHandler(handler, warnings)
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
Writes(producedObject)
routes = append(routes, route)
...
}
for _, route := range routes {
...
ws.Route(route)
}
}
}
总结
- Kube APIServer 会为每种资源类型都构建相应的 Cacher,Cacher 通过 Etcd 的 watch 机制保持与 Etcd 中的数据一致。可以理解为每种资源的请求处理都是独立的流程,只是这些流程都是类似的。
相关资料
Kubernetes API Server handler 注册过程分析:https://cloudnative.to/blog/apiserver-handler-register/
kube-apiserver-处理流程:https://blog.tianfeiyu.com/source-code-reading-notes/kubernetes/kube_apiserver.html#kube-apiserver-%E5%A4%84%E7%90%86%E6%B5%81%E7%A8%8B
kubernetes apiserver原理概览:https://github.com/duyanghao/kubernetes-reading-notes/blob/master/core/api-server/overview.md#kubernetes-apiserver%E5%8E%9F%E7%90%86%E6%A6%82%E8%A7%88
API Server 代码走读:https://cncamp.notion.site/kube-apiserver-10d5695cbbb14387b60c6d622005583d