51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

k8s 节点 CPU 升级,导致 kubelet 无法启动故障一例

事件背景

大家都知道 k8s 容量不够的时候,都是添加节点来解决问题。这几天有小伙伴在升级 k8s 容量的时候碰到一个问题,他将集群中某一个 node 节点的 CPU 做了升级,然后重启了这个 node 节点导致 kubelet 无法启动,然后大量 pod 被驱逐,报警电话响个不停。为了紧急恢复业务,果断参加故障恢复。

现象获取

在知道事件背景后,我登上了那个已经重启完毕的 node 节点,开始了一系列的网络测试,确认 node 这个宿主机到 Apiserver 和 Loadbalancer 的 ip 和 port 都是通的。随后赶紧看了下 kubelet 的日志,果不其然,一行日志让我看到问题点:

E1121 23:43:52.644552   23453 policy_static.go:158] "Static policy invalid state, please drain node and remove policy state file" err="current set of available CPUs \"0-7\" doesn't match with CPUs in state \"0-3\""
E1121 23:43:52.644569   23453 cpu_manager.go:230] "Policy start error" err="current set of available CPUs \"0-7\" doesn't match with CPUs in state \"0-3\""
E1121 23:43:52.644587   23453 kubelet.go:1431] "Failed to start ContainerManager" err="start cpu manager error: current set of available CPUs \"0-7\" doesn't match with CPUs in state \"0-3\""

说到这里,很多小伙伴会说:"就这??"。

真的就这。是因为啥呢?

是因为 kubelet 启动参数里面有一个参数很重要:--cpu-manager-policy。表示 kubelet 在使用宿主机的 cpu 是什么逻辑策略。如果你设定为 static ,那么就会在参数 --root-dir 指定的目录下生成一个 cpu_manager_state 这样一个绑定文件。

cpu_manager_state 内容大致长得如下:

{ "policyName": "static", "defaultCpuSet": "0-7", "checksum": 14413152 }

当你升级这个 k8s node 节点的 CPU 配置,并且使用了 static cpu 管理模式,那么 kubelet 会读取 cpu_manager_state 文件,然后跟现有的宿主运行的资源做对比,如果不一致,kubelet 就不会启动了。

原理分析

既然我们看到了具体现象和故障位置,不妨借着这个小问题我们一起开温馨下 k8s 的 cpu 管理规范。

官方文档如下:

https://kubernetes.io/zh-cn/docs/tasks/administer-cluster/cpu-management-policies/

当然我还想多少说点别的,关于 CPU Manager 整个架构,让小伙伴们有一个整体理解,能更加深入理解官方的 cpu 管理策略到底是做了些什么动作。

cpu-management-policies

CPU Manager 架构

CPU Manager 为满足条件的 Container 分配指定的 CPUs 时,会尽数按 CPU Topology 来分配,也就是参考 CPU Affinity,按如下的优先顺序进行 CPUs 选择:(Logic CPUs 就是 Hyperthreads)

  • 如果 Container 要求的 Logic CPUs 数量不少于单块 CPU Socket 中 Logci CPUs 数量,那么会优先把整块 CPU Socket 中的 Logic CPUs 分配给 Container。
  • 如果 Container 减余请求的 Logic CPU 数量不少于单块物理 CPU Core 提供的 Logic CPU 数量,那么会优先把整块物理 CPU Core 上的 Logic CPU 分配给 Container。

Container 托余请求的 Logic CPUs 则从按以下规则排列好的 Logic CPUs 列表中选择:

  • 同一插槽上可用的 CPU 数量
  • 同一核心上可用的 CPU 数量

参考代码:pkg/kubelet/cm/cpumanager/cpu_assignment.go

func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
 acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
 if acc.isSatisfied() {
  return acc.result, nil
 }
 if acc.isFailed() {
  return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
 }

 // Algorithm: topology-aware best-fit
 // 1. Acquire whole sockets, if available and the container requires at
 // least a socket's-worth of CPUs.
 for _, s := range acc.freeSockets() {
  if acc.needs(acc.topo.CPUsPerSocket()) {
   glog.V(4).Infof("[cpumanager] takeByTopology: claiming socket [%d]", s)
   acc.take(acc.details.CPUsInSocket(s))
   if acc.isSatisfied() {
    return acc.result, nil
   }
  }
 }

 // 2. Acquire whole cores, if available and the container requires at least
 // a core's-worth of CPUs.
 for _, c := range acc.freeCores() {
  if acc.needs(acc.topo.CPUsPerCore()) {
   glog.V(4).Infof("[cpumanager] takeByTopology: claiming core [%d]", c)
   acc.take(acc.details.CPUsInCore(c))
   if acc.isSatisfied() {
    return acc.result, nil
   }
  }
 }

 // 3. Acquire single threads, preferring to fill partially-allocated cores
 // on the same sockets as the whole cores we have already taken in this
 // allocation.
 for _, c := range acc.freeCPUs() {
  glog.V(4).Infof("[cpumanager] takeByTopology: claiming CPU [%d]", c)
  if acc.needs(1) {
   acc.take(cpuset.NewCPUSet(c))
  }
  if acc.isSatisfied() {
   return acc.result, nil
  }
 }

 return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
}

发现 CPU Topology

参考代码:vendor/github.com/google/cadvisor/info/v1/machine.go

type MachineInfo struct {
 // The number of cores in this machine.
 NumCores int `json:"num_cores"`

 ...

 // Machine Topology
 // Describes cpu/memory layout and hierarchy.
 Topology []Node `json:"topology"`

 ...
}

type Node struct {
 Id int `json:"node_id"`
 // Per-node memory
 Memory uint64  `json:"memory"`
 Cores  []Core  `json:"cores"`
 Caches []Cache `json:"caches"`
}

cAdvisor 通过 GetTopology 完成 cpu 拓普信息生成,主要是读取宿主机上 /proc/cpuinfo 中信息来渲染 CPU Topology,通过读取 /sys/devices/system/cpu/cpu 来获得 cpu cache 信息。

参考代码:vendor/github.com/google/cadvisor/info/v1/machine.go

func GetTopology(sysFs sysfs.SysFs, cpuinfo string) ([]info.Node, int, error) {
 nodes := []info.Node{}

 ...
 return nodes, numCores, nil
}

创建 pod 过程

对于前面提到的 static policy 情况下 Container 如何创建呢?kubelet 会为其选择约定的 cpu affinity 来为其选择最佳的 CPU Set。

Container 的创建时 CPU Manager 工作流程大致下:

  1. Kuberuntime 调用容器运行时去创建容器。
  2. Kuberuntime 将容器传递给 CPU Manager 处理。
  3. CPU Manager 为 Container 按照静态策略进行处理。
  4. CPU Manager 从当前 Shared Pool 中选择"最佳"Set 拓结构的 CPU,对于不满 Static Policy 的 Contianer,则返回 Shared Pool 中所有 CPU 组合的 Set。
  5. CPU Manager 将针对容器的 CPUs 分配情况记录到 Checkpoint State 中,并从 Shared Pool 中删除刚刚分配的 CPUs。
  6. CPU Manager 再从 state 中读取该 Container 的 CPU 分配信息,然后通过 UpdateContainerResources cRI 接口将其更新到 Cpuset Cgroups 中,包例如对于非 Static Policy Container。
  7. Kuberuntime 调用容器运行时启动该容器。

参考代码:pkg/kubelet/cm/cpumanager/cpu_manager.go

func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
 m.Lock()
 defer m.Unlock()
 if cset, exists := m.state.GetCPUSet(string(pod.UID), container.Name); exists {
  m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)
 }
 m.containerMap.Add(string(pod.UID), container.Name, containerID)
}

参考代码:pkg/kubelet/cm/cpumanager/policy_static.go

func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) {
 opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
 if err != nil {
  return nil, err
 }

 klog.InfoS("Static policy created with configuration", "options", opts)

 policy := &staticPolicy{
  topology:    topology,
  affinity:    affinity,
  cpusToReuse: make(map[string]cpuset.CPUSet),
  options:     opts,
 }

 allCPUs := topology.CPUDetails.CPUs()
 var reserved cpuset.CPUSet
 if reservedCPUs.Size() > 0 {
  reserved = reservedCPUs
 } else {
  // takeByTopology allocates CPUs associated with low-numbered cores from
  // allCPUs.
  //
  // For example: Given a system with 8 CPUs available and HT enabled,
  // if numReservedCPUs=2, then reserved={0,4}
  reserved, _ = policy.takeByTopology(allCPUs, numReservedCPUs)
 }

 if reserved.Size() != numReservedCPUs {
  err := fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)
  return nil, err
 }

 klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved)
 policy.reserved = reserved

 return policy, nil
}

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
 if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
  klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
  // container belongs in an exclusively allocated pool

  if p.options.FullPhysicalCPUsOnly && ((numCPUs % p.topology.CPUsPerCore()) != 0) {
   // Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted
   // if the CPU requested is a multiple of the number of virtual cpus per physical cores.
   // In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put
   // in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores
   // and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs,
   // the pod would be placed on a node where there are enough physical cores available to be allocated.
   // Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket
   // and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all
   // CPUs on a physical core. Allocation of individual threads would never have to occur.
   return SMTAlignmentError{
    RequestedCPUs: numCPUs,
    CpusPerCore:   p.topology.CPUsPerCore(),
   }
  }
  if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
   p.updateCPUsToReuse(pod, container, cpuset)
   klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
   return nil
  }

  // Call Topology Manager to get the aligned socket affinity across all hint providers.
  hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
  klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)

  // Allocate CPUs according to the NUMA affinity contained in the hint.
  cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
  if err != nil {
   klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
   return err
  }
  s.SetCPUSet(string(pod.UID), container.Name, cpuset)
  p.updateCPUsToReuse(pod, container, cpuset)

 }
 // container belongs in the shared pool (nothing to do; use default cpuset)
 return nil
}

func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
 klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)

 allocatableCPUs := p.GetAllocatableCPUs(s).Union(reusableCPUs)

 // If there are aligned CPUs in numaAffinity, attempt to take those first.
 result := cpuset.NewCPUSet()
 if numaAffinity != nil {
  alignedCPUs := cpuset.NewCPUSet()
  for _, numaNodeID := range numaAffinity.GetBits() {
   alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
  }

  numAlignedToAlloc := alignedCPUs.Size()
  if numCPUs < numAlignedToAlloc {
   numAlignedToAlloc = numCPUs
  }

  alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
  if err != nil {
   return cpuset.NewCPUSet(), err
  }

  result = result.Union(alignedCPUs)
 }

 // Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
 remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size())
 if err != nil {
  return cpuset.NewCPUSet(), err
 }
 result = result.Union(remainingCPUs)

 // Remove allocated CPUs from the shared CPUSet.
 s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))

 klog.InfoS("AllocateCPUs", "result", result)
 return result, nil
}

删除 pod 过程

当这些通过 CPU Managers 分配 CPUs 的 Container 要删除时,CPU Manager 工作流大致如下:

  1. Kuberuntime 会调用 CPU Manager 去按静态策略中定义分发处理。
  2. CPU Manager 将容器分配的 Cpu Set 重新归还到 Shared Pool 中。
  3. Kuberuntime 调用容器运行时移除该容器。
  4. CPU Manager 会异步进行协调循环,为使用共享池中的 Cpus 容器更新 CPU 集合。

参考代码:pkg/kubelet/cm/cpumanager/cpu_manager.go

func (m *manager) RemoveContainer(containerID string) error {
 m.Lock()
 defer m.Unlock()

 err := m.policyRemoveContainerByID(containerID)
 if err != nil {
  klog.ErrorS(err, "RemoveContainer error")
  return err
 }

 return nil
}

参考代码:pkg/kubelet/cm/cpumanager/policy_static.go

func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
 klog.InfoS("Static policy: RemoveContainer", "podUID", podUID, "containerName", containerName)
 if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
  s.Delete(podUID, containerName)
  // Mutate the shared pool, adding released cpus.
  s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
 }
 return nil
}

处理方法

知道了异常的原因和以及具体原因,解决办法也非常好弄就两步:

  1. 删除原有 cpu_manager_state 文件
  2. 重启 kubelet
赞(0)
未经允许不得转载:工具盒子 » k8s 节点 CPU 升级,导致 kubelet 无法启动故障一例