概述
client-go package 的源码结构如下所示,
$ tree -L 2 client-go
client-go
├── discovery # 提供 DiscoveryClient 类型的客户端。
├── dynamic # 提供 DynamicClient 类型的客户端。
├── informers # 提供 Kubernetes 内置资源的 informer 机制的实现。informer 机制会缓存 APIServer 中的资源,只有当资源对象发生变化的时候才收到通知,减少了客户端对 APIServer 的频繁访问。每种内置资源会有自己的 informer 实现,也是按照资源分组和版本来进行区分。
├── kubernetes # 提供 ClientSet 类型的客户端。
├── listers # 提供 Kubernetes 内置资源的只读客户端 lister。每种资源的 lister 都有 Get() 和 List() 方法,并且结果都是从缓存中读取的。
├── rest # 提供最基础的 RESTClient。
├── scale # 提供 ScalClient 客户端,用于扩缩容 Deployment,ReplicaSet 等资源对象。
├── tools # 提供常用方法,比如获取 kubeconfig 的方法;SharedInformer、Reflector、DealtFIFO 和 Indexer 等实现;提供缓存机制,减轻 apiserver 的负载等。
├── transport # 提供安全的 TCP 链接,支持 HTTP Stream。某些操作需要在客户端和容器之间传输二进制流,比如 exec、attach 等。该功能由内部的 spdy 包提供支持。
├── util # 提供常用的方法,例如 WorkQueue 工作队列,Certificate 证书管理等。
其中各模块的依赖关系如图所示,
-
每种资源的定义包含在 k8s.io/api 中。
-
k8s.io/api-machinery 提供了针对资源的分组与版本。
-
在资源定义、分组及版本的基础之上,提供了四种客户端,
-
RESTClient 是最基础的客户端,对 HTTP Request 进行了封装,实现了 RESTful 风格的 API,可以直接通过 RESTClient 提供的 RESTful 方法如 Get()、Put()、Post()、Delete() 进行交互。同时支持 Protobuf 和 JSON 格式数据,并且对内置资源和 CRDs 都支持。
ClientSet、DynamicClient 及 DiscoveryClient 客户端都是基于 RESTClient 实现的。
-
ClientSet 在 RESTClient 的基础上封装了对 Kubernetes 内置资源和版本的管理方法,每个资源都有一个客户端,而 ClientSet 则是多个客户端的集合。ClientSet 相当于预先实现每种资源及对应版本的操作,并且内部的数据都是结构化的。
ClientSet 只能处理 Kubernetes 内置资源,并且它是通过 client-gen 代码生成器自动生成的。
-
DynamicClient 与 ClientSet 最大的不同之处是,ClientSet 只能访问 Kubernetes 自带的资源(即 Client 集合内的资源),不能直接访问 CRD 自定义资源。DynamicClient 能够处理 Kubernetes 中的所有资源对象,包括 Kuberntes 内置资源与 CRD 自定义资源。它也封装了 RESTClient,通过指定 GVR 并结合 unstructured.Unstructured 类型实现了对所有资源对象的访问。
DynamicClient 没有使用 k8s.io/api 中定义的各种 API 资源的 Go 结构体,而是使用 unstructured.Unstructured 表示所有资源对象,用于处理非结构化数据结构。unstructured.Unstructured 类型嵌套了一个
map[string]inferface{}
值来表示 API 资源的内部结构,该结构和服务端的 REST 负载是相似的,因为服务端收到之后也是可以处理的。但是,需要注意的是,DynamicClient 只支持 Json 传输。DynamicClient 的使用会将所有数据绑定推迟到运行时,不是类型安全的。这种方式的好处在于当资源对象发生变化时,使用 DynamicClient 的程序不需要重新编译。但是,对于某些需要强数据类型检查和验证的应用程序来说,可能是一个问题。DynamicClient 的一个适用场景,就是当一个组件需要控制所有的 API 资源时,比如 garbage collector 和 namespace controller 中就使用了 DynamicClient。
-
DiscoveryClient 用于发现 kube-apiserver 所支持的资源组、资源版本、资源信息,即 GVR(Group、Versions、Resources)信息。kubectl api-versions 和 kubectl api-resources 命令输出都是通过 DisconversyClient 实现的。
-
-
而在四种客户端又提供了 informer/lister/tools 等模块。
RESTClient 使用
//参考path : /api/v1/namespace/{namespace}/pods
config.APIPath = "api"
//pod的group是空字符串
/*
// GroupName is the group name use in this package
const GroupName = ""
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
*/
config.GroupVersion = &corev1.SchemeGroupVersion
//指定序列化工具
config.NegotiatedSerializer = scheme.Codecs
//根据配置信息构建restClient示例
restClient,err := rest.RESTClientFor(config)
if err != nil{
fmt.Println("init restClient failed ! err: ",err)
panic(err.Error())
}
//保存pod结果的数据结构实例
result := &corev1.PodList{}
//指定namespace
namespace := "dev"
//设置请求参数,然后发起请求
//GET请求
err = restClient.Get().
// 指定namespace,参考path: /api/v1/namespace/{namespace}/pods
Namespace(namespace).
// 查找多个pod,参考path: /api/v1/namespace/{namespace}/pods
Resource("pods").
// 指定大小限制和序列化工具,限制指定返回结果条目为100条
// VersionedParams(&metav1.ListOptions{Limit:100}, scheme.ParameterCodec).
// 使用字段选择器选择只返回metadata.name为coredns-64dc4c69b-v8t4m的pod
// VersionedParams(&metav1.ListOptions{Limit:100,FieldSelector:"metadata.name=coredns-64dc4c69b- v8t4m"}, scheme.ParameterCodec).
// 使用字段选择器选择只返回状态(status.phase)为Running的pod
// VersionedParams(&metav1.ListOptions{Limit:100,FieldSelector:"status.phase=Running"}, scheme.ParameterCodec).
// 使用标签选择器选择标签k8s-app=kube-dns的pod
// VersionedParams(&metav1.ListOptions{Limit:100,LabelSelector:"k8s-app=kube-dns"}, scheme.ParameterCodec).
// 请求
Do(context.TODO()).
// 将结果存入result
Into(result)
Clientset 使用
type Clientset struct {
*authenticationv1beta1.AuthenticationV1beta1Client
*authorizationv1.AuthorizationV1Client
// ...
*corev1.CoreV1Client
}
//实例化一个clientset对象
clientset,err := kubernetes.NewForConfig(config)
if err != nil{
fmt.Println("init clientset failed ! err: ",err)
panic(err.Error())
}
//获取podClient客户端,corev1.NamespaceAll 为空字符串,实际如果为空字符串,那么拿到的是所有名称空间的pod资源
//podClient := clientset.CoreV1().Pods(corev1.NamespaceAll)
podClient := clientset.CoreV1().Pods("dev")
//使用podclient客户端,列出名称空间内所有pod资源
result,err := podClient.List(context.TODO(),metav1.ListOptions{})
if err != nil{
fmt.Println("podclient get pods failed! err: ",err)
panic(err.Error())
}
DynamicClient 使用
//实例化一个DynamicClient对象
dynamicClient,err := dynamic.NewForConfig(config)
if err != nil{
fmt.Println("init dynamicClient failed ! err: ",err)
panic(err.Error())
}
//dynamicClient的唯一关联方法所需的入参,GVR
gvr := schema.GroupVersionResource{Version: "v1",Resource: "pods"}
//使用dynamicClient的查询列表方法,查询指定namespace下的所有pod
//注意此方法返回的数据结构类型是UnstructuredList
unstructObjList,err := dynamicClient.
// Resource是dynamicClient唯一的一个方法,参数为gvr
Resource(gvr).
// 指定查询的namespace
Namespace("dev").
// 以list列表的方式查询
List(context.TODO(),metav1.ListOptions{Limit: 100})
if err != nil{
fmt.Println("dynamicClient list pods failed ! err :",err)
panic(err.Error())
}
//实例化一个PodList数据结构,用于接收从unstructObjList转换后的结果
result := &corev1.PodList{}
//转换
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObjList.UnstructuredContent(),result)
if err != nil{
fmt.Println("unstructured failed ! err: ",err)
panic(err.Error())
}
DiscoveryClient 使用
//实例化一个discoveryClient对象
discoveryClient,err := discovery.NewDiscoveryClientForConfig(config)
if err != nil{
fmt.Println("init discoveryClient failed ! err: ",err)
panic(err.Error())
}
//获取所有分组的资源数据
APIGroup,APIResourceListSlice,err := discoveryClient.ServerGroupsAndResources()
if err != nil{
fmt.Println("get apigroup resource failed! err: ",err)
panic(err.Error())
}
//先看Group信息
//fmt.Printf("APIGroup : \n\n %v\n\n\n\n",APIGroup)
// APIResourceListSlice是个切片,里面的每个元素代表一个GroupVersion及其资源
for _, singleAPIResourceList := range APIResourceListSlice {
// GroupVersion是个字符串,例如"apps/v1"
groupVerionStr := singleAPIResourceList.GroupVersion
// ParseGroupVersion方法将字符串转成数据结构
gv, err := schema.ParseGroupVersion(groupVerionStr)
if err != nil {
panic(err.Error())
}
fmt.Println("*****************************************************************")
fmt.Printf("GV string [%v]\nGV struct [%#v]\nresources :\n\n", groupVerionStr, gv)
// APIResources字段是个切片,里面是当前GroupVersion下的所有资源
for _, singleAPIResource := range singleAPIResourceList.APIResources {
fmt.Printf("%v\n", singleAPIResource.Name)
}
}
可以将 DiscoveryClient 发现的信息缓存到本地,当需要的时候先从本地读取,以减轻对 API Server 访问的压力。比如在 kubectl 中就这么使用了。kubectl 访问资源的时候,会用到 GVK/GVR 等相关信息,比如 CRD 等。在使用这些信息的时候,它会先访问本地是否有缓存。
// staging/src/k8s.io/kubectl/pkg/cmd/delete/delete.go // 有这么一段代码 dynamicClient, err := f.DynamicClient() if err != nil { return err } discoveryClient, err := f.ToDiscoveryClient() if err != nil { return err }
func (f *ConfigFlags) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) { config, err := f.ToRESTConfig() if err != nil { return nil, err } // defaultCacheDir = filepath.Join(homedir.HomeDir(), ".kube", "cache") cacheDir := defaultCacheDir if f.CacheDir != nil { cacheDir = *f.CacheDir } httpCacheDir := filepath.Join(cacheDir, "http") discoveryCacheDir := computeDiscoverCacheDir(filepath.Join(cacheDir, "discovery"), config.Host) return diskcached.NewCachedDiscoveryClientForConfig(config, discoveryCacheDir, httpCacheDir, time.Duration(10*time.Minute)) }
dynamicClient和DiscoveryClient结合使用案例
package main
import (
"context"
"flag"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/client-go/discovery"
memory "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"log"
"os"
"path/filepath"
)
//自动以数据
const metaCRD = `
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: my-nginx
name: my-nginx
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: my-nginx
template:
metadata:
creationTimestamp: null
labels:
app: my-nginx
spec:
containers:
- image: nginx
name: nginx
`
func homeDir() string{
if h := os.Getenv("HOME");h != ""{
return h
}
return os.Getenv("USERPROFILE") //windows的家目录
}
func GetK8sConfig()(config *rest.Config,err error){
//获取k8s rest config
var kubeconfig *string
// home是家目录,如果能取到家目录的值,就可以用来做默认值
if home := homeDir();home != ""{
// 如果输入了kubeconfig参数,该参数的值就是kubeconfig文件的路径,
// 如果没有输入kubeconfig参数,就用默认路径 ~/.kube/config
kubeconfig = flag.String("kubeconfig",filepath.Join(home,".kube","config"),"(optional) path to the kubeconfig file")
}else{
//如果取不到当前用户的家目录,就没有办法设置kubeconfig的默认目录了,只能从入参中取
kubeconfig = flag.String("kubeconfig","","(optional) path to the kubeconfig file")
}
flag.Parse()
// 从本机加载kubeconfig配置文件,因此第一个参数为空字符串
config, err = clientcmd.BuildConfigFromFlags("",*kubeconfig)
// kubeconfig 加载失败就直接退出
if err != nil{
fmt.Println("load kubeconfig failed!,err:",err)
panic(err.Error())
}
return
}
func GetGVRdyClient(gvk *schema.GroupVersionKind,namespace string)(dr dynamic.ResourceInterface,err error){
config,err := GetK8sConfig()
if err != nil{
panic(err.Error())
}
//创建discovery客户端
discoverClient,err := discovery.NewDiscoveryClientForConfig(config)
if err != nil{
panic(err.Error())
}
//获取GVK GVR映射,返回的是集群所有的GVK以及GVR映射
mapperGVRGVK := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoverClient))
// 根据资源GVK,获取资源的GVR GVK映射
resourceMapper,err := mapperGVRGVK.RESTMapping(gvk.GroupKind(),gvk.Version)
//fmt.Println("resourceMapper -----",resourceMapper.Scope.Name())
if err != nil{
panic(err.Error())
}
//创建动态客户端
dynamicClient ,err := dynamic.NewForConfig(config)
if err != nil{
panic(err.Error())
}
if resourceMapper.Scope.Name() == meta.RESTScopeNameNamespace{
// 获取gvr对应的动态客户端
dr = dynamicClient.Resource(resourceMapper.Resource).Namespace(namespace)
}else {
dr = dynamicClient.Resource(resourceMapper.Resource)
}
return
}
func main(){
var (
err error
objGET *unstructured.Unstructured
objCreate *unstructured.Unstructured
objUpdate *unstructured.Unstructured
gvk *schema.GroupVersionKind
dr dynamic.ResourceInterface
)
obj := &unstructured.Unstructured{}
_, gvk, err = yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode([]byte(metaCRD), nil, obj)
if err != nil {
panic(fmt.Errorf("failed to get GVK: %v", err))
}
dr,err = GetGVRdyClient(gvk,obj.GetNamespace())
if err != nil {
panic(fmt.Errorf("failed to get dr: %v", err))
}
//创建
objCreate,err = dr.Create(context.TODO(),obj,metav1.CreateOptions{})
if err != nil {
panic(fmt.Errorf("Create resource ERROR: %v", err))
//panic(err.Error())
}
log.Print("Create: : ",objCreate)
// 查询
objGET,err = dr.Get(context.TODO(),obj.GetName(),metav1.GetOptions{})
if err != nil {
panic(fmt.Errorf("select resource ERROR: %v", err))
}
log.Print("GET: ",objGET)
//更新
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
// 查询resource是否存在
result, getErr := dr.Get(context.TODO(),obj.GetName(),metav1.GetOptions{})
if getErr != nil {
panic(fmt.Errorf("failed to get latest version of : %v", getErr))
}
// 提取obj 的 spec 期望值
spec, found, err := unstructured.NestedMap(obj.Object, "spec")
if err != nil || !found || spec == nil {
panic(fmt.Errorf(" not found or error in spec: %v", err))
}
//将副本数修改为2个
spec["replicas"] = int64(2)
// 更新 存在资源的spec
if err := unstructured.SetNestedMap(result.Object, spec, "spec", ); err != nil {
fmt.Println("errrrrr\n")
panic(err)
}
fmt.Println("result-------------",result)
// 更新资源
objUpdate, err = dr.Update(context.TODO(),result,metav1.UpdateOptions{})
log.Print("update : ",objUpdate)
return err
})
if retryErr != nil {
panic(fmt.Errorf("update failed: %v", retryErr))
} else {
log.Print("更新成功")
}
//删除
err = dr.Delete(context.TODO(),obj.GetName(),metav1.DeleteOptions{})
if err != nil {
panic(fmt.Errorf("delete resource ERROR : %v", err))
} else {
log.Print("删除成功")
}
}
相关链接
-
client-go库使用-k8s四种客户端介绍:https://xieys.club/client-go/
-
复盘 kubernetes client-go:https://morven.life/posts/k8s-client-go/