2026/2/13 4:27:10
网站建设
项目流程
个人网站设计内容,重庆微网站建设,小游戏大全网站,网站关键词排名优化应该怎么做✍️ 作者#xff1a;茶水间Tech #x1f3f7;️ 标签#xff1a;#云计算#云原生#kubernetes#容器 #x1f4d6; 前言
kubernetes的模块比较多#xff0c;架构复杂#xff0c;代码量更是庞大#xff0c;看代码比较麻烦#xff0c;我们从现实场景出发#xff0c;从…✍️作者茶水间Tech️标签#云计算#云原生#kubernetes#容器 前言 kubernetes的模块比较多架构复杂代码量更是庞大看代码比较麻烦我们从现实场景出发从创建POD分析在Kubernetes内部的代码流程本系列文章从POD创建整体梳理Kubernetes源码实现其中本节主要分析kubelet侧的流程实现。 本文基于Client Version: v1.34.3 Server Version: v1.34.2 POD创建的整体架构图 正文 一、关于kubelet 在kubernetes集群中每个Node节点都会启动kubelet进程用来处理Master节点下发到本节点的任务管理Pod和其中的容器。 kubelet 是基于 PodSpec 来工作的。每个 PodSpec 是一个描述 Pod 的 YAML 或 JSON 对象。 kubelet 接受通过各种机制主要是通过 apiserver提供的一组 PodSpec并确保这些 PodSpec 中描述的容器处于运行状态且运行状况良好。在每个 Node 上运行的 Kubelet都会维持一个与 API Server 的长连接。带过滤条件的 Watch:Kubelet 只关心属于它自己的 Pod。它会向 API Server 发起一个类似这样的请求GET /v1/pods?watchtruefieldSelectorspec.nodeName{My_Node_Name}这里的fieldSelector极其重要它确保了 Node-A 不会收到发给 Node-B 的 Pod 信息。事件触达:当 Scheduler 完成 Bind 动作API Server 中的 Pod 对象更新了nodeName字段。API Server 会立刻通过这个长连接向该节点所在的 Kubelet 推送一个“MODIFIED”事件。 二、代码分析程序入口Run scheduler.go)Run中启动了syncLoop循环同步代码路径kubernetes/pkg/kubelet/kubelet.gofunc (kl *Kubelet) Run(updates -chan kubetypes.PodUpdate) { //...(略) kl.syncLoop(ctx, updates, kl) }详细流程如下kubelet2.1 主循环syncLoopkubelet.go这个 syncLoop 是 Kubelet 的心脏确保节点上的 Pod 状态与期望状态保持一致。代码路径kubernetes/pkg/kubelet/kubelet.gofunc (kl *Kubelet) syncLoop(ctx context.Context, updates -chan kubetypes.PodUpdate, handler SyncHandler) { klog.InfoS(Starting kubelet main sync loop) // The syncTicker wakes up kubelet to checks if there are any pod workers // that need to be syncd. A one-second period is sufficient because the // sync interval is defaulted to 10s. // 同步定时器每秒触发一次检查需要同步的 Pod syncTicker : time.NewTicker(time.Second) defer syncTicker.Stop() // 清理定时器执行周期性清理任务默认2分钟 housekeepingTicker : time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() plegCh : kl.pleg.Watch() const ( base 100 * time.Millisecond // 初始延迟100ms max 5 * time.Second // 最大延迟5秒 factor 2 // 指数因子2倍增长 ) duration : base // Responsible for checking limits in resolv.conf // The limits do not have anything to do with individual pods // Since this is called in syncLoop, we dont need to call it anywhere else if kl.dnsConfigurer ! nil kl.dnsConfigurer.ResolverConfig ! { kl.dnsConfigurer.CheckLimitsForResolvConf(klog.FromContext(ctx)) } for { if err : kl.runtimeState.runtimeErrors(); err ! nil { klog.ErrorS(err, Skipping pod synchronization) // exponential backoff time.Sleep(duration) // 计算下次退避时间最大5秒 duration time.Duration(math.Min(float64(max), factor*float64(duration))) continue } // reset backoff if we have a success duration base kl.syncLoopMonitor.Store(kl.clock.Now()) // 执行一次同步迭代 if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } kl.syncLoopMonitor.Store(kl.clock.Now()) } }2.2 同步迭代syncLoopIterationkubelet.gosyncLoopIteration 对POD的不同操作做对应的处理这里发现是创建POD则会调用HandlePodAdditions 进行创建POD代码路径kubernetes/pkg/kubelet/kubelet.gofunc (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh -chan kubetypes.PodUpdate, handler SyncHandler, syncCh -chan time.Time, housekeepingCh -chan time.Time, plegCh -chan *pleg.PodLifecycleEvent) bool { logger : klog.FromContext(ctx) select { case u, open : -configCh: // Update from a config source; dispatch it to the right handler // callback. if !open { klog.ErrorS(nil, Update channel is closed, exiting the sync loop) return false } switch u.Op { case kubetypes.ADD: //接收到POD创建事件 klog.V(2).InfoS(SyncLoop ADD, source, u.Source, pods, klog.KObjSlice(u.Pods)) // After restarting, kubelet will get all existing pods through // ADD as if they are new pods. These pods will then go through the // admission process and *may* be rejected. This can be resolved // once we have checkpointing. //主POD处理函数 handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: klog.V(2).InfoS(SyncLoop UPDATE, source, u.Source, pods, klog.KObjSlice(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: klog.V(2).InfoS(SyncLoop REMOVE, source, u.Source, pods, klog.KObjSlice(u.Pods)) handler.HandlePodRemoves(u.Pods) // ...(略) } return true }2.3 主处理函数HandlePodAdditionskubelet.goHandlePodAdditions是 kubelet 处理新 Pod 添加的核心函数。当 kubelet 从 API server 接收到新 Pod 时此函数负责Pod 注册将新 Pod 添加到 pod manager 中作为期望状态的单一事实来源准入控制检查节点资源是否足够接纳新 Pod证书管理跟踪 Pod 的证书信息Mirror Pod 处理处理静态 Pod 的 mirror pod垂直扩缩容支持 Pod 的原地垂直扩缩容InPlacePodVerticalScaling工作调度将 Pod 传递给 pod workers 进行实际创建和同步代码路径kubernetes/pkg/kubelet/kubelet.gofunc (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { start : kl.clock.Now() // 对 Pod 进行排序静态 Pod 优先然后按创建时间 sort.Sort(sliceutils.PodsByCreationTime(pods)) var pendingResizes []types.UID for _, pod : range pods { // Always add the pod to the pod manager. Kubelet relies on the pod // manager as the source of truth for the desired state. If a pod does // not exist in the pod manager, it means that it has been deleted in // the apiserver and no action (other than cleanup) is required. // 将 Pod 添加到 Pod Manager // Pod Manager 是 Kubelet 的真实状态源 kl.podManager.AddPod(pod) kl.podCertificateManager.TrackPod(context.TODO(), pod) // 获取 Pod 和它的 Mirror Pod pod, mirrorPod, wasMirror : kl.podManager.GetPodAndMirrorPod(pod) if wasMirror { if pod nil { klog.V(2).InfoS(Unable to find pod for mirror pod, skipping, mirrorPod, klog.KObj(mirrorPod), mirrorPodUID, mirrorPod.UID) continue } kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodUpdate, StartTime: start, }) continue } // Only go through the admission process if the pod is not requested // for termination by another part of the kubelet. If the pod is already // using resources (previously admitted), the pod worker is going to be // shutting it down. If the pod hasnt started yet, we know that when // the pod worker is invoked it will also avoid setting up the pod, so // we simply avoid doing any work. // We also do not try to admit the pod that is already in terminated state. // 检查 Pod 是否正在终止或已终止 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) !podutil.IsPodPhaseTerminal(pod.Status.Phase) { // Check if we can admit the pod; if not, reject it. // We failed pods that we rejected, so activePods include all admitted // pods that are alive. // 检查资源是否足够 // 返回是否接受、拒绝原因、详细消息 if ok, reason, message : kl.allocationManager.AddPod(kl.GetActivePods(), pod); !ok { kl.rejectPod(pod, reason, message) // We avoid recording the metric in canAdmitPod because its called // repeatedly during a resize, which would inflate the metric. // Instead, we record the metric here in HandlePodAdditions for new pods // and capture resize events separately. recordAdmissionRejection(reason) continue } if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // Backfill the queue of pending resizes, but only after all the pods have // been added. This ensures that no resizes get resolved until all the // existing pods are added. _, updatedFromAllocation : kl.allocationManager.UpdatePodFromAllocation(pod) if updatedFromAllocation { pendingResizes append(pendingResizes, pod.UID) } } } // 通过 podWorkers.UpdatePod 异步创建 Pod kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: kubetypes.SyncPodCreate, // 创建类型 StartTime: start, }) } if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { // 回填 Pod 调整大小的条件 kl.statusManager.BackfillPodResizeConditions(pods) // 推送待处理的调整大小请求 for _, uid : range pendingResizes { kl.allocationManager.PushPendingResize(uid) } // 重试待处理的调整 if len(pendingResizes) 0 { kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded) } } }a. Pod Managertype podManager struct { // 存储所有 Pod 的期望状态 pods map[types.UID]*v1.Pod // Mirror Pod 映射 mirrorPods map[types.UID]*v1.Pod // 静态 Pod 映射 staticPods map[types.UID]*v1.Pod }b. Allocation Managertype allocationManager struct { // 管理节点资源分配 // 包括 CPU、内存、设备等 } func (m *allocationManager) AddPod(activePods []*v1.Pod, pod *v1.Pod) (bool, string, string) { // 检查资源是否足够 // 返回是否接受、拒绝原因、详细消息 }c. InPlace Pod 垂直扩缩容这是一个新特性允许在不重启 Pod 的情况下调整资源if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { kl.statusManager.BackfillPodResizeConditions(pods) for _, uid : range pendingResizes { kl.allocationManager.PushPendingResize(uid) } if len(pendingResizes) 0 { kl.allocationManager.RetryPendingResizes(allocation.TriggerReasonPodsAdded) } }2.4 更新创建PODUpdatePodpod_workers.goUpdatePod是 kubelet 中处理 Pod 更新的核心入口函数负责Pod 状态管理维护 Pod 的同步状态跟踪 Pod 的生命周期首次同步、终止中、已终止更新调度接收 Pod 更新请求并调度到相应的 worker goroutine生命周期转换处理 Pod 从创建到终止的状态转换资源管理集成资源分配管理器支持原地垂直扩缩容并发控制协调多个 Pod 更新请求确保状态一致性代码路径kubernetes/pkg/kubelet/pod_workers.gofunc (p *podWorkers) UpdatePod(options UpdatePodOptions) { // ...(略) // start the pod worker goroutine if it doesnt exist podUpdates, exists : p.podUpdates[uid] if !exists { // buffer the channel to avoid blocking this method podUpdates make(chan struct{}, 1) p.podUpdates[uid] podUpdates // ensure that static pods start in the order they are received by UpdatePod if kubetypes.IsStaticPod(pod) { p.waitingToStartStaticPodsByFullname[status.fullname] append(p.waitingToStartStaticPodsByFullname[status.fullname], uid) } // allow testing of delays in the pod update channel var outCh -chan struct{} if p.workerChannelFn ! nil { outCh p.workerChannelFn(uid, podUpdates) } else { outCh podUpdates } // spawn a pod worker // 启动 worker goroutine go func() { // TODO: this should be a wait.Until with backoff to handle panics, and // accept a context for shutdown defer runtime.HandleCrash() defer klog.V(3).InfoS(Pod worker has stopped, podUID, uid) p.podWorkerLoop(uid, outCh) }() } // ...(略) }2.5 真实创建podWorkerLooppod_workers.gopodWorkerLoop是 kubelet 中每个 Pod 的独立工作循环负责处理单个 Pod 的所有同步操作。每个 Pod 都有自己专属的 goroutine 运行此函数主要功能包括事件驱动处理通过podUpdateschannel 接收并处理 Pod 的更新事件状态同步根据 Pod 的当前工作类型执行相应的同步操作生命周期管理处理 Pod 从创建、运行到终止的完整生命周期错误恢复在操作失败时进行重试确保 Pod 状态最终一致资源清理在 Pod 终止时清理相关资源并关闭 worker最终调用调用SyncPod执行同步。代码路径kubernetes/pkg/kubelet/pod_workerss.gofunc (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates -chan struct{}) { var lastSyncTime time.Time for range podUpdates { //启动同步操作 ctx, update, canStart, canEverStart, ok : p.startPodSync(podUID) // If we had no update waiting, it means someone initialized the channel without filling out pendingUpdate. //...(略) // Take the appropriate action (illegal phases are prevented by UpdatePod) switch { case update.WorkType TerminatedPod: err p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status) case update.WorkType TerminatingPod: var gracePeriod *int64 if opt : update.Options.KillPodOptions; opt ! nil { gracePeriod opt.PodTerminationGracePeriodSecondsOverride } podStatusFn : p.acknowledgeTerminating(podUID) // if we only have a running pod, terminate it directly if update.Options.RunningPod ! nil { err p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod) } else { err p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn) } default: //正常同步 Pod 状态 isTerminal, err p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status) } lastSyncTime p.clock.Now() return err }() //...(略) } }2.6 kubelet创建SyncPodkubelet.go[SyncPod] 是 kubelet 中同步 Pod 状态的核心函数负责将 Pod 的期望状态与实际状态对齐。主要功能包括Pod 状态同步根据 Pod 的期望状态和当前运行时状态执行必要的操作使两者一致生命周期管理处理 Pod 从创建、运行到终止的完整生命周期资源管理创建和管理 Pod 的 Cgroups确保资源隔离和 QoS卷管理等待并挂载 Pod 所需的存储卷网络配置检查网络插件状态处理主机网络 Pod可观测性记录指标、日志和追踪信息支持性能监控和问题诊断最后调用kl.containerRuntime.SyncPod调用容器运行时同步回调代码路径kubernetes/pkg/kubelet/kubelet.gofunc (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) { // ...(略) // Make data directories for the pod // 创建 Pod 目录 if err : kl.makePodDataDirs(pod); err ! nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, error making pod data directories: %v, err) klog.ErrorS(err, Unable to make pod data directories for pod, pod, klog.KObj(pod)) return false, err } // Wait for volumes to attach/mount // 等待挂载卷 if err : kl.volumeManager.WaitForAttachAndMount(ctx, pod); err ! nil { var volumeAttachLimitErr *volumemanager.VolumeAttachLimitExceededError if errors.As(err, volumeAttachLimitErr) { kl.rejectPod(pod, volumemanager.VolumeAttachmentLimitExceededReason, volumeAttachLimitErr.Error()) recordAdmissionRejection(volumemanager.VolumeAttachmentLimitExceededReason) return true, nil } if !wait.Interrupted(err) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, Unable to attach or mount volumes: %v, err) klog.ErrorS(err, Unable to attach or mount volumes for pod; skipping pod, pod, klog.KObj(pod)) } return false, err } // Fetch the pull secrets for the pod //为POD下载secret pullSecrets : kl.getPullSecretsForPod(pod) //...(略) //调用容器运行时创建和启动容器 result : kl.containerRuntime.SyncPod(sctx, pod, podStatus, pullSecrets, kl.crashLoopBackOff) kl.reasonCache.Update(pod.UID, result) if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { for _, r : range result.SyncResults { if r.Action kubecontainer.ResizePodInPlace r.Error ! nil { // If the condition already exists, the observedGeneration does not get updated. kl.statusManager.SetPodResizeInProgressCondition(pod.UID, v1.PodReasonError, r.Message, pod.Generation) } } } return false, result.Error() }运行时2.7 运行时SyncPodkuberuntime_manager.goSyncPod是 kubelet 容器运行时管理器的核心同步函数负责将 Pod 的期望状态与容器运行时的实际状态对齐。主要功能包括Pod 同步根据 Pod 规范和当前运行时状态执行必要的操作使两者一致Sandbox 管理创建、更新或删除 Pod sandbox容器运行时隔离环境容器生命周期管理启动、停止、重启容器包括 init 容器、临时容器和主容器资源管理处理容器资源分配和原地垂直扩缩容网络配置管理 Pod IP 地址和网络配置镜像管理拉取容器镜像和镜像卷错误处理记录同步结果和错误支持重试机制代码路径kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go// SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create ephemeral containers. // 6. Create init containers. // 7. Resize running containers (if InPlacePodVerticalScalingtrue) // 8. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { logger : klog.FromContext(ctx) //...(略) // Step 4: Create a sandbox for the pod if necessary. podSandboxID : podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error logger.V(4).Info(Creating PodSandbox for pod, pod, klog.KObj(pod)) //...(略) //开始创建沙箱 podSandboxID, msg, err m.createPodSandbox(ctx, pod, podContainerChanges.Attempt) //...(略) logger.V(4).Info(Created PodSandbox for pod, podSandboxID, podSandboxID, pod, klog.KObj(pod)) //获取沙箱状态 resp, err : m.runtimeService.PodSandboxStatus(ctx, podSandboxID, false) //...(略) // If we ever allow updating a pod from non-host-network to // host-network, we may use a stale IP. //给POD分配IP if !kubecontainer.IsHostNetworkPod(pod) { // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox. podIPs m.determinePodSandboxIPs(ctx, pod.Namespace, pod.Name, resp.GetStatus()) logger.V(4).Info(Determined the ip for pod after sandbox changed, IPs, podIPs, pod, klog.KObj(pod)) } } // the start containers routines depend on pod ip(as in primary pod ip) // instead of trying to figure out if we have 0 len(podIPs) // everytime, we short circuit it here podIP : if len(podIPs) ! 0 { podIP podIPs[0] } // Get podSandboxConfig for containers to start. configPodSandboxResult : kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) result.AddSyncResult(configPodSandboxResult) podSandboxConfig, err : m.generatePodSandboxConfig(ctx, pod, podContainerChanges.Attempt) if err ! nil { message : fmt.Sprintf(GeneratePodSandboxConfig for pod %q failed: %v, format.Pod(pod), err) logger.Error(err, GeneratePodSandboxConfig for pod failed, pod, klog.KObj(pod)) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message) return } imageVolumePullResults, err : m.getImageVolumes(ctx, pod, podSandboxConfig, pullSecrets) if err ! nil { logger.Error(err, Get image volumes for pod failed, pod, klog.KObj(pod)) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, err.Error()) return } // Helper containing boilerplate common to starting all types of containers. // typeName is a description used to describe this type of container in log messages, // currently: container, init container or ephemeral container // metricLabel is the label used to describe this type of container in monitoring metrics. // currently: container, init_container or ephemeral_container //沙箱建好了接下来在POD内创建容器 start : func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error { startContainerResult : kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err : m.doBackOff(ctx, pod, spec.container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) logger.V(4).Info(Backing Off restarting container in pod, containerType, typeName, container, spec.container.Name, pod, klog.KObj(pod)) return err } metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc() if sc.HasWindowsHostProcessRequest(pod, spec.container) { metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc() } logger.V(4).Info(Creating container in pod, containerType, typeName, container, spec.container.Name, pod, klog.KObj(pod)) // We fail late here to populate the ErrImagePull and ImagePullBackOff correctly to the end user. imageVolumes, err : m.toKubeContainerImageVolumes(ctx, imageVolumePullResults, spec.container, pod, startContainerResult) if err ! nil { return err } // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs. msg, err m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs, imageVolumes) incrementImageVolumeMetrics(err, msg, spec.container, imageVolumes) if err ! nil { // startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are // useful to cluster administrators to distinguish server errors from user errors. metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc() if sc.HasWindowsHostProcessRequest(pod, spec.container) { metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc() } startContainerResult.Fail(err, msg) // known errors that are logged in other places are logged at higher levels here to avoid // repetitive log spam switch { case err images.ErrImagePullBackOff: logger.V(3).Info(Container start failed in pod, containerType, typeName, container, spec.container.Name, pod, klog.KObj(pod), containerMessage, msg, err, err) default: utilruntime.HandleError(fmt.Errorf(%v %v start failed in pod %v: %w: %s, typeName, spec.container.Name, format.Pod(pod), err, msg)) } return err } return nil } // Step 5: start ephemeral containers // These are started prior to init containers to allow running ephemeral containers even when there // are errors starting an init container. In practice init containers will start first since ephemeral // containers cannot be specified on pod creation. for _, idx : range podContainerChanges.EphemeralContainersToStart { start(ctx, ephemeral container, metrics.EphemeralContainer, ephemeralContainerStartSpec(pod.Spec.EphemeralContainers[idx])) } // Step 6: start init containers. for _, idx : range podContainerChanges.InitContainersToStart { container : pod.Spec.InitContainers[idx] // Start the next init container. if err : start(ctx, init container, metrics.InitContainer, containerStartSpec(container)); err ! nil { if podutil.IsRestartableInitContainer(container) { logger.V(4).Info(Failed to start the restartable init container for the pod, skipping, initContainerName, container.Name, pod, klog.KObj(pod)) continue } logger.V(4).Info(Failed to initialize the pod, as the init container failed to start, aborting, initContainerName, container.Name, pod, klog.KObj(pod)) return } // Successfully started the container; clear the entry in the failure logger.V(4).Info(Completed init container for pod, containerName, container.Name, pod, klog.KObj(pod)) } // Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources if resizable, _, _ : allocation.IsInPlacePodVerticalScalingAllowed(pod); resizable { if len(podContainerChanges.ContainersToUpdate) 0 || podContainerChanges.UpdatePodResources { result.SyncResults append(result.SyncResults, m.doPodResizeAction(ctx, pod, podStatus, podContainerChanges)) } } // Step 8: start containers in podContainerChanges.ContainersToStart. for _, idx : range podContainerChanges.ContainersToStart { start(ctx, container, metrics.Container, containerStartSpec(pod.Spec.Containers[idx])) } return }2.8 创建沙箱createPodSandboxkuberuntime_sandbox.gocreatePodSandbox是 kubelet 容器运行时管理器中创建 Pod sandbox 的核心函数。Sandbox 是容器运行时的隔离环境为 Pod 提供共享的网络、PID 等命名空间。主要功能包括生成 Sandbox 配置根据 Pod 规范生成 sandbox 配置创建日志目录为 Pod 创建日志存储目录查找运行时处理器根据 RuntimeClass 查找对应的容器运行时创建 Sandbox调用容器运行时接口创建实际的 sandbox错误处理在每个步骤进行错误检查和日志记录代码路径kubernetes/pkg/kubelet/kuberuntime/kuberuntime_sandbox.gofunc (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) { // ...(略) podSandBoxID, err : m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler) if err ! nil { message : fmt.Sprintf(Failed to create sandbox for pod %q: %v, format.Pod(pod), err) logger.Error(err, Failed to create sandbox for pod, pod, klog.KObj(pod)) return , message, err } return podSandBoxID, , nil }2.9 调用CRIRunPodSandboxremote_runtime.go[RunPodSandbox]是 CRI容器运行时接口客户端的核心方法负责通过 gRPC 调用远程容器运行时创建 Pod sandbox。主要功能包括超时管理为 sandbox 创建操作设置超时时间默认 4 分钟远程调用通过 gRPC 客户端调用容器运行时的RunPodSandbox接口响应验证验证返回的 sandbox ID 是否有效错误处理记录错误日志并返回适当的错误信息日志记录记录请求和响应的详细信息便于调试代码路径kubernetes/staging/src/k8s.io/cri-client/pkg/remote_runtime.gofunc (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) { // ...(略) resp, err : r.runtimeClient.RunPodSandbox(ctx, runtimeapi.RunPodSandboxRequest{ Config: config, RuntimeHandler: runtimeHandler, }) // ...(略) return podSandboxID, nil }Containerd进入containerd 的内容 总结与展望Kubelet 的syncLoop会定期默认 10 秒执行一次全量同步。如果容器崩了在下一次同步时Kubelet 发现“API Server 说应该有这个 Pod但 containerd 说没这个容器”Kubelet 会立即重新调用 CRI 创建它。如果网络断了等网络恢复Kubelet 会通过 Watch 补偿机制补齐断网期间错过的所有 Pod 更新。创建POD过程中会通过CRI调用containerd 先创建pause沙箱再在沙箱内创建业务容器实现整个POD的生命周期管理。这就是为什么 Kubernetes 是“状态驱动”而不是“指令驱动”Kubelet 永远在对比“API 记录的准则”和“我这台机器上的实际容器”一旦不对劲就立刻修正。 参考资料https://kubernetes.io/zh-cn/docs/reference/command-line-tools-reference/kubelet/https://blog.huweihuang.com/kubernetes-notes/principle/component/kubernetes-core-principle-kubelet/