基础
在客户端发起 exec 请求之后,比如 kubectl exec -it
之后,
- APIServer 向 pod 所在的 kubelet 发送 Exec 请求。
- kubelet 向 CRI shim 发送 Exec 请求,请求 Exec 的 URL。
- CRI shim 给 kubelet 返回 Exec 的 URL,通常是 Streaming Server 的地址。
- kubelet 给 APIServer 返回重定向的响应。
- APIServer 重定向请求到 Exec URL。之后就是 APIServer 跟 CRI shim 中的 Streaming Server 进行数据交互。
源码解析
按照基本流程,分组件对 Exec 的执行流程进行解析。
APIServer 侧
APIServer 侧在注册 API handler 的时候,走的是 Connector 的逻辑,
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
switch action.Verb {
...
case "CONNECT":
for _, method := range connecter.ConnectMethods() {
...
handler := metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, restfulConnectResource(connecter, reqScope, admit, path, isSubresource))
route := ws.Method(method).Path(action.Path).
To(handler).
...
}
}
...
}
从这里入手后,接下去的调用链路是 restfulConnectResource()->handlers.ConnectResource()
。APIServer 在收到 Exec 请求之后,最终负责处理的是 handlers.ConnectResource() 返回的函数。在这个函数中,
- connecter.Connect() 获取到跟 kubelet 连接的地址,并返回一个连接可升级的 handler。
- handler.ServeHTTP() 向 kubelet 发起连接,并负责后续的处理,比如重定向。
func ConnectResource(connecter rest.Connecter, scope *RequestScope, admit admission.Interface, restPath string, isSubresource bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
handler, err := connecter.Connect(ctx, name, opts, &responder{scope: scope, req: req, w: w})
if err != nil {
scope.err(err, w, req)
return
}
handler.ServeHTTP(w, req)
})
}
}
connecter.Connect() 的代码如下所示,
- 首先是 pod.ExecLocation->pod.streamLocation()。在这里可以看到获取 node 相关的信息,其实就是获取 kubelet 的连接信息。
- 之后是根据 location 信息返回一个 handler。
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
execOpts, ok := opts.(*api.PodExecOptions)
...
location, transport, err := pod.ExecLocation(ctx, r.Store, r.KubeletConn, name, execOpts)
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}
func streamLocation(ctx context.Context, getter ResourceGetter, connInfo client.ConnectionInfoGetter, name string, opts runtime.Object, container, path string,) (*url.URL, http.RoundTripper, error) {
pod, err := getPod(ctx, getter, name)
nodeName := types.NodeName(pod.Spec.NodeName)
nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)
...
loc := &url.URL{
Scheme: nodeInfo.Scheme,
Host: net.JoinHostPort(nodeInfo.Hostname, nodeInfo.Port),
Path: fmt.Sprintf("/%s/%s/%s/%s", path, pod.Namespace, pod.Name, container),
RawQuery: params.Encode(),
}
return loc, nodeInfo.Transport, nil
}
总结下,APIServer 侧接收到 Exec 后的行为是,
- 向 Kubelet 发起 Exec 请求,Kubelet 返回重定向响应,重定向的地址是 Streaming Server 的地址。
- APIServer 重定向请求到 Streaming Server,之后就是 APIServer 和 Streaming Server 之间的数据交互。
Kubelet 侧
Kubelet 在启动的时候也会启动一个 http server,注册针对 exec 的 handler。跟 exec 相关的 handler 注册如下,所有的路径最后都由 Server.getExec() 方法来处理。
func (s *Server) InstallDebuggingHandlers() {
...
s.addMetricsBucketMatcher("exec")
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)
...
}
Server.getExec() 方法中,
- 首先调用 host.GetExec() 获取 URL。在 host.GetExec() 中的调用链路为
Kubelet.GetExec()->streamingRuntime.GetExec()->runtimeService.Exec()
,也就是最终调用的是 CRI Shim 的 Exec() 方法,比如 containerd 的 CRI shim 的 Exec() 方法。 - 在获取 URL 之后,以重定向的方式返回给 APIServer。
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
...
url, err := s.host.GetExec(request.Request.Context(), podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
...
proxyStream(response.ResponseWriter, request.Request, url)
}
总结下,Kubelet 侧在收到 Exec 请求后,
- 先是请求 CRI shim 的 Exec 接口获取 URL 地址,获取到的是 CRI shim 中 Streaming Server 的地址。
- 再将 URL 以重定向的方式返回给 APIServer。
CRI Shim 侧
CRI shim 通常采用的方式是在内部启动一个 streamingServer 专门提供 stream 服务。kubelet 为了方便各个 container runtime 实现 CRI shim,提供了统一的包,位于 kubelet/pkg/cri/streaming
中,各个 CRI shim 只需要实现其中的 steaming.Runtime 接口即可创建一个 streamingServer。而 CRI shim 自身针对 Exec() 请求返回的 URL 则是 streamingServer 的地址。
type Runtime interface {
Exec(ctx context.Context, containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
Attach(ctx context.Context, containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
PortForward(ctx context.Context, podSandboxID string, port int32, stream io.ReadWriteCloser) error
}
先来看下 containerd 的 CRI Shim 针对 Exec 请求的处理,它其实就是调用 streamServer.GetExec() 方法返回一个地址。可以看到这里返回的是一个简单 token 组合成的 URL。streamingServer 会将 token 和对应的请求缓存在本地,后面真正的 exec 请求需要携带这个 token,streamingServer 再根据这个 token 找到之前的请求。
// internal/cri/server/container_exec.go(https://github.com/containerd/containerd)
func (c *criService) Exec(ctx context.Context, r *runtime.ExecRequest) (*runtime.ExecResponse, error) {
cntr, err := c.containerStore.Get(r.GetContainerId())
state := cntr.Status.Get().State()
return c.streamServer.GetExec(r)
}
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
...
return &runtimeapi.ExecResponse{
Url: s.buildURL("exec", token),
}, nil
}
streamingServer 中针对真正的 Exec 请求的处理,调用的是 server.serveExec() 方法,在 NewServer() 方法就可以看到。之后的调用链路是 remotecommandserver.ServeExec()->executor.ExecInContainer()->Runtime.Exec()
。Runtime 就是 steaming.Runtime 接口,每个 CRI shim 都需要实现这个接口,可以看到在 NewServer() 的时候需要传入具体的 Runtime 实现。
// staging/src/k8s.io/kubelet/pkg/cri/streaming/server.go
func NewServer(config Config, runtime Runtime) (Server, error) {
...
endpoints := []struct {
path string
handler restful.RouteFunction
}{
{"/exec/{token}", s.serveExec},
{"/attach/{token}", s.serveAttach},
{"/portforward/{token}", s.servePortForward},
}
return s, nil
}
在 containerd 的 CRI shim 中,Exec 接口的实现如下,调用的 containerd 的 API。
// internal/cri/server/streaming.go(https://github.com/containerd/containerd)
func (s *streamRuntime) Exec(ctx context.Context, containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
exitCode, err := s.c.execInContainer(ctrdutil.WithNamespace(ctx), containerID, execOptions{
cmd: cmd,
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
resize: resize,
})
return &exec.CodeExitError{
Err: fmt.Errorf("error executing command %v, exit code %d", cmd, *exitCode),
Code: int(*exitCode),
}
}
总结下,
- containerd 的 CRI shim 针对 Exec() 接口返回的是 Streaming Server 的 URL。
- containerd 的 CRI shim 中的 Streaming Server 的 Exec() 是真正用于处理 Exec 请求的。
相关链接
kubectl exec 在kubelet中的处理流程:https://www.cnblogs.com/gaorong/p/11873114.html
kubectl exec 的来龙去脉:https://blog.fleeto.us/post/how-kubectl-exec-works/
containerd 的 CRI shim 实现:https://github.com/containerd/containerd/tree/main/internal/cri/server