程序锅

  • 首页
  • 分类
  • 标签
  • 归档
  • 关于

  • 搜索
基础知识 Etcd LeetCode 计算机体系结构 Kubernetes Containerd Docker 容器 云原生 Serverless 项目开发维护 ELF 深入理解程序 Tmux Vim Linux Kernel Linux numpy matplotlib 机器学习 MQTT 网络基础 Thrift RPC OS 操作系统 Clang 研途 数据结构和算法 Java 编程语言 Golang Python 个人网站搭建 Nginx 计算机通用技术 Git

Kubernetes 核心流程-Exec 流程

发表于 2023-03-28 | 分类于 Kubernetes | 0 | 阅读次数 2143

基础

在客户端发起 exec 请求之后,比如 kubectl exec -it 之后,

  1. APIServer 向 pod 所在的 kubelet 发送 Exec 请求。
  2. kubelet 向 CRI shim 发送 Exec 请求,请求 Exec 的 URL。
  3. CRI shim 给 kubelet 返回 Exec 的 URL,通常是 Streaming Server 的地址。
  4. kubelet 给 APIServer 返回重定向的响应。
  5. APIServer 重定向请求到 Exec URL。之后就是 APIServer 跟 CRI shim 中的 Streaming Server 进行数据交互。

源码解析

按照基本流程,分组件对 Exec 的执行流程进行解析。

APIServer 侧

APIServer 侧在注册 API handler 的时候,走的是 Connector 的逻辑,

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
	...
  switch action.Verb {
    ...
    case "CONNECT":
      for _, method := range connecter.ConnectMethods() {
 				...
        handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulConnectResource(connecter, reqScope, admit, path, isSubresource))
        route := ws.Method(method).Path(action.Path).
        To(handler).
        ...
      }
  }
	...
}

从这里入手后,接下去的调用链路是 restfulConnectResource()->handlers.ConnectResource()。APIServer 在收到 Exec 请求之后,最终负责处理的是 handlers.ConnectResource() 返回的函数。在这个函数中,

  • connecter.Connect() 获取到跟 kubelet 连接的地址,并返回一个连接可升级的 handler。
  • handler.ServeHTTP() 向 kubelet 发起连接,并负责后续的处理,比如重定向。
func ConnectResource(connecter rest.Connecter, scope *RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		...
		metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
			handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
			if err != nil {
				scope.err(err, w, req)
				return
			}
			handler.ServeHTTP(w, req)
		})
	}
}

connecter.Connect() 的代码如下所示,

  • 首先是 pod.ExecLocation->pod.streamLocation()。在这里可以看到获取 node 相关的信息,其实就是获取 kubelet 的连接信息。
  • 之后是根据 location 信息返回一个 handler。
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
	execOpts, ok := opts.(*api.PodExecOptions)
	...
	location, transport, err := pod.ExecLocation(ctx, r.Store, r.KubeletConn, name, execOpts)
	
	return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}

func streamLocation(ctx context.Context, getter ResourceGetter, connInfo client.ConnectionInfoGetter, name string, opts runtime.Object, container, path string,) (*url.URL, http.RoundTripper, error) {
	pod, err := getPod(ctx, getter, name)

	nodeName := types.NodeName(pod.Spec.NodeName)
	
	nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
	...
	loc := &url.URL{
		Scheme:   nodeInfo.Scheme,
		Host:     net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
		Path:     fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),
		RawQuery: params.Encode(),
	}
	return loc, nodeInfo.Transport, nil
}

总结下,APIServer 侧接收到 Exec 后的行为是,

  • 向 Kubelet 发起 Exec 请求,Kubelet 返回重定向响应,重定向的地址是 Streaming Server 的地址。
  • APIServer 重定向请求到 Streaming Server,之后就是 APIServer 和 Streaming Server 之间的数据交互。

Kubelet 侧

Kubelet 在启动的时候也会启动一个 http server,注册针对 exec 的 handler。跟 exec 相关的 handler 注册如下,所有的路径最后都由 Server.getExec() 方法来处理。

func (s *Server) InstallDebuggingHandlers() {
	...
	s.addMetricsBucketMatcher("exec")
	ws = new(restful.WebService)
	ws.
		Path("/exec")
	ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
		To(s.getExec).
		Operation("getExec"))
	s.restfulCont.Add(ws)
	...
}

Server.getExec() 方法中,

  • 首先调用 host.GetExec() 获取 URL。在 host.GetExec() 中的调用链路为 Kubelet.GetExec()->streamingRuntime.GetExec()->runtimeService.Exec(),也就是最终调用的是 CRI Shim 的 Exec() 方法,比如 containerd 的 CRI shim 的 Exec() 方法。
  • 在获取 URL 之后,以重定向的方式返回给 APIServer。
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
	...
	url, err := s.host.GetExec(request.Request.Context(), podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
	...
	proxyStream(response.ResponseWriter, request.Request, url)
}

总结下,Kubelet 侧在收到 Exec 请求后,

  • 先是请求 CRI shim 的 Exec 接口获取 URL 地址,获取到的是 CRI shim 中 Streaming Server 的地址。
  • 再将 URL 以重定向的方式返回给 APIServer。

CRI Shim 侧

CRI shim 通常采用的方式是在内部启动一个 streamingServer 专门提供 stream 服务。kubelet 为了方便各个 container runtime 实现 CRI shim,提供了统一的包,位于 kubelet/pkg/cri/streaming 中,各个 CRI shim 只需要实现其中的 steaming.Runtime 接口即可创建一个 streamingServer。而 CRI shim 自身针对 Exec() 请求返回的 URL 则是 streamingServer 的地址。

type Runtime interface {
	Exec(ctx context.Context, containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
	Attach(ctx context.Context, containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
	PortForward(ctx context.Context, podSandboxID string, port int32, stream io.ReadWriteCloser) error
}

先来看下 containerd 的 CRI Shim 针对 Exec 请求的处理,它其实就是调用 streamServer.GetExec() 方法返回一个地址。可以看到这里返回的是一个简单 token 组合成的 URL。streamingServer 会将 token 和对应的请求缓存在本地,后面真正的 exec 请求需要携带这个 token,streamingServer 再根据这个 token 找到之前的请求。

// internal/cri/server/container_exec.go(https://github.com/containerd/containerd)
func (c *criService) Exec(ctx context.Context, r *runtime.ExecRequest) (*runtime.ExecResponse, error) {
	cntr, err := c.containerStore.Get(r.GetContainerId())
	
	state := cntr.Status.Get().State()

	return c.streamServer.GetExec(r)
}

func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
  ...
	return &runtimeapi.ExecResponse{
		Url: s.buildURL("exec", token),
	}, nil
}

streamingServer 中针对真正的 Exec 请求的处理,调用的是 server.serveExec() 方法,在 NewServer() 方法就可以看到。之后的调用链路是 remotecommandserver.ServeExec()->executor.ExecInContainer()->Runtime.Exec()。Runtime 就是 steaming.Runtime 接口,每个 CRI shim 都需要实现这个接口,可以看到在 NewServer() 的时候需要传入具体的 Runtime 实现。

// staging/src/k8s.io/kubelet/pkg/cri/streaming/server.go
func NewServer(config Config, runtime Runtime) (Server, error) {
	...
	endpoints := []struct {
		path    string
		handler restful.RouteFunction
	}{
		{"/exec/{token}", s.serveExec},
		{"/attach/{token}", s.serveAttach},
		{"/portforward/{token}", s.servePortForward},
	}
  
	return s, nil
}

在 containerd 的 CRI shim 中,Exec 接口的实现如下,调用的 containerd 的 API。

// internal/cri/server/streaming.go(https://github.com/containerd/containerd)
func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
	exitCode, err := s.c.execInContainer(ctrdutil.WithNamespace(ctx), containerID, execOptions{
		cmd:    cmd,
		stdin:  stdin,
		stdout: stdout,
		stderr: stderr,
		tty:    tty,
		resize: resize,
	})
	
	return &exec.CodeExitError{
		Err:  fmt.Errorf("error executing command %v, exit code %d", cmd, *exitCode),
		Code: int(*exitCode),
	}
}

总结下,

  • containerd 的 CRI shim 针对 Exec() 接口返回的是 Streaming Server 的 URL。
  • containerd 的 CRI shim 中的 Streaming Server 的 Exec() 是真正用于处理 Exec 请求的。

相关链接

kubectl exec 在kubelet中的处理流程:https://www.cnblogs.com/gaorong/p/11873114.html

kubectl exec 的来龙去脉:https://blog.fleeto.us/post/how-kubectl-exec-works/

containerd 的 CRI shim 实现:https://github.com/containerd/containerd/tree/main/internal/cri/server

卷死我
dawnguo 微信支付

微信支付

dawnguo 支付宝

支付宝

  • 本文作者: dawnguo
  • 本文链接: /archives/228
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Kubernetes
Kubernetes 调度原理
Kubernetes 核心流程-Pod 创建过程
  • 文章目录
  • 站点概览
dawnguo

dawnguo

215 日志
24 分类
37 标签
RSS
Creative Commons
© 2018 — 2025 程序锅
0%