程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes 客户端-client go 介绍

发表于 2023-04-28 | 分类于 Kubernetes | 0 | 阅读次数 2107

概述

client-go package 的源码结构如下所示,

$ tree -L 2 client-go
client-go
├── discovery	# 提供 DiscoveryClient 类型的客户端。
├── dynamic  # 提供 DynamicClient 类型的客户端。
├── informers # 提供 Kubernetes 内置资源的 informer 机制的实现。informer 机制会缓存 APIServer 中的资源,只有当资源对象发生变化的时候才收到通知,减少了客户端对 APIServer 的频繁访问。每种内置资源会有自己的 informer 实现,也是按照资源分组和版本来进行区分。
├── kubernetes # 提供 ClientSet 类型的客户端。
├── listers # 提供 Kubernetes 内置资源的只读客户端 lister。每种资源的 lister 都有 Get() 和 List() 方法,并且结果都是从缓存中读取的。
├── rest # 提供最基础的 RESTClient。
├── scale # 提供 ScalClient 客户端,用于扩缩容 Deployment,ReplicaSet 等资源对象。
├── tools # 提供常用方法,比如获取 kubeconfig 的方法;SharedInformer、Reflector、DealtFIFO 和 Indexer 等实现;提供缓存机制,减轻 apiserver 的负载等。
├── transport # 提供安全的 TCP 链接,支持 HTTP Stream。某些操作需要在客户端和容器之间传输二进制流,比如 exec、attach 等。该功能由内部的 spdy 包提供支持。
├── util # 提供常用的方法,例如 WorkQueue 工作队列,Certificate 证书管理等。

其中各模块的依赖关系如图所示,

  • 每种资源的定义包含在 k8s.io/api 中。

  • k8s.io/api-machinery 提供了针对资源的分组与版本。

  • 在资源定义、分组及版本的基础之上,提供了四种客户端,

    • RESTClient 是最基础的客户端,对 HTTP Request 进行了封装,实现了 RESTful 风格的 API,可以直接通过 RESTClient 提供的 RESTful 方法如 Get()、Put()、Post()、Delete() 进行交互。同时支持 Protobuf 和 JSON 格式数据,并且对内置资源和 CRDs 都支持。

      ClientSet、DynamicClient 及 DiscoveryClient 客户端都是基于 RESTClient 实现的。

    • ClientSet 在 RESTClient 的基础上封装了对 Kubernetes 内置资源和版本的管理方法,每个资源都有一个客户端,而 ClientSet 则是多个客户端的集合。ClientSet 相当于预先实现每种资源及对应版本的操作,并且内部的数据都是结构化的。

      ClientSet 只能处理 Kubernetes 内置资源,并且它是通过 client-gen 代码生成器自动生成的。

    • DynamicClient 与 ClientSet 最大的不同之处是,ClientSet 只能访问 Kubernetes 自带的资源(即 Client 集合内的资源),不能直接访问 CRD 自定义资源。DynamicClient 能够处理 Kubernetes 中的所有资源对象,包括 Kuberntes 内置资源与 CRD 自定义资源。它也封装了 RESTClient,通过指定 GVR 并结合 unstructured.Unstructured 类型实现了对所有资源对象的访问。

      DynamicClient 没有使用 k8s.io/api 中定义的各种 API 资源的 Go 结构体,而是使用 unstructured.Unstructured 表示所有资源对象,用于处理非结构化数据结构。unstructured.Unstructured 类型嵌套了一个 map[string]inferface{} 值来表示 API 资源的内部结构,该结构和服务端的 REST 负载是相似的,因为服务端收到之后也是可以处理的。但是,需要注意的是,DynamicClient 只支持 Json 传输。

      DynamicClient 的使用会将所有数据绑定推迟到运行时,不是类型安全的。这种方式的好处在于当资源对象发生变化时,使用 DynamicClient 的程序不需要重新编译。但是,对于某些需要强数据类型检查和验证的应用程序来说,可能是一个问题。DynamicClient 的一个适用场景,就是当一个组件需要控制所有的 API 资源时,比如 garbage collector 和 namespace controller 中就使用了 DynamicClient。

    • DiscoveryClient 用于发现 kube-apiserver 所支持的资源组、资源版本、资源信息,即 GVR(Group、Versions、Resources)信息。kubectl api-versions 和 kubectl api-resources 命令输出都是通过 DisconversyClient 实现的。

  • 而在四种客户端又提供了 informer/lister/tools 等模块。

RESTClient 使用

//参考path : /api/v1/namespace/{namespace}/pods
config.APIPath = "api"
//pod的group是空字符串
/*
	// GroupName is the group name use in this package
	   const GroupName = ""
  // SchemeGroupVersion is group version used to register these objects
	   var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
*/
config.GroupVersion = &corev1.SchemeGroupVersion
//指定序列化工具
config.NegotiatedSerializer = scheme.Codecs

//根据配置信息构建restClient示例
restClient,err := rest.RESTClientFor(config)
if err != nil{
  fmt.Println("init restClient failed ! err: ",err)
  panic(err.Error())
}

//保存pod结果的数据结构实例
result := &corev1.PodList{}

//指定namespace
namespace := "dev"

//设置请求参数,然后发起请求

//GET请求
err = restClient.Get().
	// 指定namespace,参考path: /api/v1/namespace/{namespace}/pods
	Namespace(namespace).
	// 查找多个pod,参考path: /api/v1/namespace/{namespace}/pods
	Resource("pods").
	// 指定大小限制和序列化工具,限制指定返回结果条目为100条
	// VersionedParams(&metav1.ListOptions{Limit:100}, scheme.ParameterCodec).
	// 使用字段选择器选择只返回metadata.name为coredns-64dc4c69b-v8t4m的pod
	// VersionedParams(&metav1.ListOptions{Limit:100,FieldSelector:"metadata.name=coredns-64dc4c69b-	v8t4m"}, scheme.ParameterCodec).
	// 使用字段选择器选择只返回状态(status.phase)为Running的pod
	// VersionedParams(&metav1.ListOptions{Limit:100,FieldSelector:"status.phase=Running"}, scheme.ParameterCodec).
	// 使用标签选择器选择标签k8s-app=kube-dns的pod
	// VersionedParams(&metav1.ListOptions{Limit:100,LabelSelector:"k8s-app=kube-dns"}, scheme.ParameterCodec).
	// 请求
	Do(context.TODO()).
	// 将结果存入result
	Into(result)

Clientset 使用

type Clientset struct {
    *authenticationv1beta1.AuthenticationV1beta1Client
    *authorizationv1.AuthorizationV1Client
    // ...
    *corev1.CoreV1Client
}
//实例化一个clientset对象
clientset,err := kubernetes.NewForConfig(config)
if err != nil{
	fmt.Println("init clientset failed ! err: ",err)
	panic(err.Error())
}

//获取podClient客户端,corev1.NamespaceAll 为空字符串,实际如果为空字符串,那么拿到的是所有名称空间的pod资源
//podClient := clientset.CoreV1().Pods(corev1.NamespaceAll)
podClient := clientset.CoreV1().Pods("dev")

//使用podclient客户端,列出名称空间内所有pod资源
result,err := podClient.List(context.TODO(),metav1.ListOptions{})
if err != nil{
	fmt.Println("podclient get pods failed! err: ",err)
	panic(err.Error())
}

DynamicClient 使用

//实例化一个DynamicClient对象
dynamicClient,err := dynamic.NewForConfig(config)
if err != nil{
  fmt.Println("init dynamicClient failed ! err: ",err)
  panic(err.Error())
}

//dynamicClient的唯一关联方法所需的入参,GVR
gvr := schema.GroupVersionResource{Version: "v1",Resource: "pods"}

//使用dynamicClient的查询列表方法,查询指定namespace下的所有pod
//注意此方法返回的数据结构类型是UnstructuredList
unstructObjList,err := dynamicClient.
                       // Resource是dynamicClient唯一的一个方法,参数为gvr
                       Resource(gvr).
                       // 指定查询的namespace
                       Namespace("dev").
                       // 以list列表的方式查询
                       List(context.TODO(),metav1.ListOptions{Limit: 100})

if err != nil{
  fmt.Println("dynamicClient list pods failed ! err :",err)
  panic(err.Error())
}

//实例化一个PodList数据结构,用于接收从unstructObjList转换后的结果
result := &corev1.PodList{}

//转换
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObjList.UnstructuredContent(),result)
if err != nil{
  fmt.Println("unstructured failed ! err: ",err)
  panic(err.Error())
}

DiscoveryClient 使用

//实例化一个discoveryClient对象
discoveryClient,err := discovery.NewDiscoveryClientForConfig(config)
if err != nil{
  fmt.Println("init discoveryClient failed ! err: ",err)
  panic(err.Error())
}

//获取所有分组的资源数据
APIGroup,APIResourceListSlice,err := discoveryClient.ServerGroupsAndResources()
if err != nil{
  fmt.Println("get apigroup resource failed! err: ",err)
  panic(err.Error())
}
//先看Group信息
//fmt.Printf("APIGroup : \n\n %v\n\n\n\n",APIGroup)

// APIResourceListSlice是个切片,里面的每个元素代表一个GroupVersion及其资源
for _, singleAPIResourceList := range APIResourceListSlice {

  // GroupVersion是个字符串,例如"apps/v1"
  groupVerionStr := singleAPIResourceList.GroupVersion

  // ParseGroupVersion方法将字符串转成数据结构
  gv, err := schema.ParseGroupVersion(groupVerionStr)

  if err != nil {
    panic(err.Error())
  }

  fmt.Println("*****************************************************************")
  fmt.Printf("GV string [%v]\nGV struct [%#v]\nresources :\n\n", groupVerionStr, gv)

  // APIResources字段是个切片,里面是当前GroupVersion下的所有资源
  for _, singleAPIResource := range singleAPIResourceList.APIResources {
    fmt.Printf("%v\n", singleAPIResource.Name)
  }
}

可以将 DiscoveryClient 发现的信息缓存到本地,当需要的时候先从本地读取,以减轻对 API Server 访问的压力。比如在 kubectl 中就这么使用了。kubectl 访问资源的时候,会用到 GVK/GVR 等相关信息,比如 CRD 等。在使用这些信息的时候,它会先访问本地是否有缓存。

// staging/src/k8s.io/kubectl/pkg/cmd/delete/delete.go
// 有这么一段代码

dynamicClient, err := f.DynamicClient()
if err != nil {
return err
}
  discoveryClient, err := f.ToDiscoveryClient()
if err != nil {
return err
}
func (f *ConfigFlags) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
	config, err := f.ToRESTConfig()
	if err != nil {
		return nil, err
	}

// defaultCacheDir = filepath.Join(homedir.HomeDir(), ".kube", "cache")

  	cacheDir := defaultCacheDir

	if f.CacheDir != nil {
		cacheDir = *f.CacheDir
	}
	httpCacheDir := filepath.Join(cacheDir, "http")
	discoveryCacheDir := computeDiscoverCacheDir(filepath.Join(cacheDir, "discovery"), config.Host)

	return diskcached.NewCachedDiscoveryClientForConfig(config, discoveryCacheDir, httpCacheDir, time.Duration(10*time.Minute))
}

dynamicClient和DiscoveryClient结合使用案例

package main

import (
   "context"
   "flag"
   "fmt"
   "k8s.io/apimachinery/pkg/api/meta"
   metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
   "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
   "k8s.io/apimachinery/pkg/runtime/schema"
   "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
   "k8s.io/client-go/discovery"
   memory "k8s.io/client-go/discovery/cached"
   "k8s.io/client-go/dynamic"
   "k8s.io/client-go/rest"
   "k8s.io/client-go/restmapper"
   "k8s.io/client-go/tools/clientcmd"
   "k8s.io/client-go/util/retry"
   "log"
   "os"
   "path/filepath"
)

//自动以数据
const metaCRD = `
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: my-nginx
  name: my-nginx
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: my-nginx
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: my-nginx
    spec:
      containers:
      - image: nginx
        name: nginx

`

func homeDir() string{
   if h := os.Getenv("HOME");h != ""{
      return h
   }
   return os.Getenv("USERPROFILE") //windows的家目录
}

func GetK8sConfig()(config *rest.Config,err error){
   //获取k8s rest config
   var kubeconfig *string

   // home是家目录,如果能取到家目录的值,就可以用来做默认值
   if home := homeDir();home != ""{
      // 如果输入了kubeconfig参数,该参数的值就是kubeconfig文件的路径,
      // 如果没有输入kubeconfig参数,就用默认路径 ~/.kube/config
      kubeconfig = flag.String("kubeconfig",filepath.Join(home,".kube","config"),"(optional)  path to the kubeconfig file")
   }else{
      //如果取不到当前用户的家目录,就没有办法设置kubeconfig的默认目录了,只能从入参中取
      kubeconfig = flag.String("kubeconfig","","(optional)  path to the kubeconfig file")
   }


   flag.Parse()

   // 从本机加载kubeconfig配置文件,因此第一个参数为空字符串
   config, err = clientcmd.BuildConfigFromFlags("",*kubeconfig)
   // kubeconfig 加载失败就直接退出
   if err != nil{
      fmt.Println("load kubeconfig failed!,err:",err)
      panic(err.Error())
   }
   return
}

func GetGVRdyClient(gvk *schema.GroupVersionKind,namespace string)(dr dynamic.ResourceInterface,err error){
   config,err := GetK8sConfig()
   if err != nil{
      panic(err.Error())
   }

   //创建discovery客户端
   discoverClient,err := discovery.NewDiscoveryClientForConfig(config)
   if err != nil{
      panic(err.Error())
   }

   //获取GVK GVR映射,返回的是集群所有的GVK以及GVR映射
   mapperGVRGVK := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoverClient))



   // 根据资源GVK,获取资源的GVR GVK映射

   resourceMapper,err := mapperGVRGVK.RESTMapping(gvk.GroupKind(),gvk.Version)
   //fmt.Println("resourceMapper  -----",resourceMapper.Scope.Name())
   if err != nil{
      panic(err.Error())
   }

   //创建动态客户端
   dynamicClient ,err := dynamic.NewForConfig(config)
   if err != nil{
      panic(err.Error())
   }

   if resourceMapper.Scope.Name() == meta.RESTScopeNameNamespace{
      // 获取gvr对应的动态客户端
      dr = dynamicClient.Resource(resourceMapper.Resource).Namespace(namespace)
   }else {
      dr = dynamicClient.Resource(resourceMapper.Resource)
   }
   return
}

func main(){
   var (
      err error
      objGET *unstructured.Unstructured
      objCreate *unstructured.Unstructured
      objUpdate *unstructured.Unstructured
      gvk *schema.GroupVersionKind
      dr dynamic.ResourceInterface
   )
   obj := &unstructured.Unstructured{}
   _, gvk, err = yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode([]byte(metaCRD), nil, obj)
   if err != nil {
      panic(fmt.Errorf("failed to get GVK: %v", err))
   }

   dr,err = GetGVRdyClient(gvk,obj.GetNamespace())
   if err != nil {
      panic(fmt.Errorf("failed to get dr: %v", err))
   }
   //创建
   objCreate,err = dr.Create(context.TODO(),obj,metav1.CreateOptions{})
   if err != nil {
      panic(fmt.Errorf("Create resource ERROR: %v", err))
      //panic(err.Error())
   }
   log.Print("Create: : ",objCreate)

   // 查询
   objGET,err = dr.Get(context.TODO(),obj.GetName(),metav1.GetOptions{})
   if err != nil {
      panic(fmt.Errorf("select resource ERROR: %v", err))
   }
   log.Print("GET: ",objGET)

   //更新
   retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
      // 查询resource是否存在
      result, getErr := dr.Get(context.TODO(),obj.GetName(),metav1.GetOptions{})
      if getErr != nil {
         panic(fmt.Errorf("failed to get latest version of : %v", getErr))
      }


      // 提取obj 的 spec 期望值
      spec, found, err := unstructured.NestedMap(obj.Object, "spec")

      if err != nil || !found || spec == nil {
         panic(fmt.Errorf(" not found or error in spec: %v", err))
      }
      //将副本数修改为2个
      spec["replicas"] = int64(2)

      // 更新 存在资源的spec
      if err := unstructured.SetNestedMap(result.Object, spec, "spec", ); err != nil {
         fmt.Println("errrrrr\n")
         panic(err)
      }
      fmt.Println("result-------------",result)
      // 更新资源
      objUpdate, err = dr.Update(context.TODO(),result,metav1.UpdateOptions{})
      log.Print("update : ",objUpdate)
      return err
   })
   if retryErr != nil {
      panic(fmt.Errorf("update failed: %v", retryErr))
   } else {
      log.Print("更新成功")
   }


   //删除
   err = dr.Delete(context.TODO(),obj.GetName(),metav1.DeleteOptions{})
   if err != nil {
      panic(fmt.Errorf("delete resource ERROR : %v", err))
   } else {
      log.Print("删除成功")
   }
}

相关链接

  1. client-go库使用-k8s四种客户端介绍:https://xieys.club/client-go/

  2. 复盘 kubernetes client-go:https://morven.life/posts/k8s-client-go/

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/233
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes 核心流程-Watch 流程
Kubernetes 客户端-controller client 介绍
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%