intel-device-plugins-for-ku.../cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go
Tuomas Katila db7e5bfc55 Add support for gas-container-tiles annotation
Adds functionality to convert container's tile annotation
in to corresponding L0 affinity mask. This helps to target
container's workload to specific L0 subdevices.

Signed-off-by: Tuomas Katila <tuomas.katila@intel.com>
2022-03-24 14:13:35 +02:00

516 lines
15 KiB
Go

// Copyright 2021 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rm
import (
"context"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin"
"github.com/pkg/errors"
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
)
const (
gasTSAnnotation = "gas-ts"
gasCardAnnotation = "gas-container-cards"
gasTileAnnotation = "gas-container-tiles"
levelZeroAffinityMaskEnvVar = "ZE_AFFINITY_MASK"
grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock"
grpcBufferSize = 4 * 1024 * 1024
grpcTimeout = 5 * time.Second
retryTimeout = 1 * time.Second
)
// Errors.
type retryErr struct{}
type zeroPendingErr struct{}
func (e *retryErr) Error() string {
return "things didn't work out, but perhaps a retry will help"
}
func (e *zeroPendingErr) Error() string {
return "there are no pending pods anymore in this node"
}
type podCandidate struct {
pod *v1.Pod
name string
allocatedContainers int
allocationTargetNum int
}
type DeviceInfo struct {
envs map[string]string
nodes []pluginapi.DeviceSpec
mounts []pluginapi.Mount
}
type getClientFunc func(string, time.Duration, int) (podresourcesv1.PodResourcesListerClient, *grpc.ClientConn, error)
// ResourceManager interface for the fractional resource handling.
type ResourceManager interface {
ReallocateWithFractionalResources(*pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error)
SetDevInfos(DeviceInfoMap)
}
type resourceManager struct {
deviceInfos DeviceInfoMap
nodeName string
clientset kubernetes.Interface
prGetClientFunc getClientFunc
skipID string
fullResourceName string
mutex sync.RWMutex // for devTree updates during scan
}
// NewDeviceInfo creates a new DeviceInfo.
func NewDeviceInfo(nodes []pluginapi.DeviceSpec, mounts []pluginapi.Mount, envs map[string]string) *DeviceInfo {
return &DeviceInfo{
nodes: nodes,
mounts: mounts,
envs: envs,
}
}
// DeviceInfoMap is a map of device infos. deviceId -> *DeviceInfo.
type DeviceInfoMap map[string]*DeviceInfo
// NewDeviceInfoMap creates a new DeviceInfoMap.
func NewDeviceInfoMap() DeviceInfoMap {
return DeviceInfoMap{}
}
// NewResourceManager creates a new resource manager.
func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error) {
clientset, err := getClientset()
if err != nil {
return nil, errors.Wrap(err, "couldn't get clientset")
}
rm := resourceManager{
nodeName: os.Getenv("NODE_NAME"),
clientset: clientset,
skipID: skipID,
fullResourceName: fullResourceName,
prGetClientFunc: podresources.GetV1Client,
}
klog.Info("GPU device plugin resource manager enabled")
return &rm, nil
}
// ReallocateWithFractionalResources runs the fractional resource logic.
// This intentionally only logs errors and returns with the UseDefaultMethodError,
// in case any errors are hit. This is to avoid clusters filling up with unexpected admission errors.
func (rm *resourceManager) ReallocateWithFractionalResources(request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
if !isInputOk(request, rm.skipID) {
// it is better to leave allocated gpu devices as is and return
return nil, &dpapi.UseDefaultMethodError{}
}
podCandidate, err := rm.findAllocationPodCandidate()
if _, ok := err.(*retryErr); ok {
klog.Warning("retrying POD resolving after sleeping")
time.Sleep(retryTimeout)
podCandidate, err = rm.findAllocationPodCandidate()
}
if err != nil {
if _, ok := err.(*zeroPendingErr); !ok {
klog.Error("allocation candidate not found, perhaps the GPU scheduler extender is not called, err:", err)
}
// it is better to leave allocated gpu devices as is and return
return nil, &dpapi.UseDefaultMethodError{}
}
pod := podCandidate.pod
cards := containerCards(pod, podCandidate.allocatedContainers)
affinityMask := containerTileAffinityMask(pod, podCandidate.allocatedContainers)
return rm.createAllocateResponse(cards, affinityMask)
}
func isInputOk(rqt *pluginapi.AllocateRequest, skipID string) bool {
// so far kubelet calls allocate for each container separately. If that changes, we need to refine our logic.
if len(rqt.ContainerRequests) != 1 {
klog.Warning("multi-container allocation request not supported")
return false
}
crqt := rqt.ContainerRequests[0]
for _, id := range crqt.DevicesIDs {
if id == skipID {
return false // intentionally not printing anything, this request is skipped
}
}
return true
}
// findAllocationPodCandidate tries to find the best allocation candidate pod, which must be:
// -pending for this node
// -using GPU resources in its spec
// -is found via grpc service with unallocated GPU devices
// returns:
// -the candidate pod struct pointer and no error, or
// -errRetry if unsuccessful, but there is perhaps hope of trying again with better luck
// -errZeroPending if no pending pods exist anymore (which is fine)
// -any grpc communication errors
func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {
// get map of pending pods for this node
pendingPods, err := rm.getNodePendingGPUPods()
if err != nil {
return nil, err
}
candidates, err := rm.findAllocationPodCandidates(pendingPods)
if err != nil {
return nil, err
}
numCandidates := len(candidates)
switch numCandidates {
case 0:
// fine, this typically happens when deployment is deleted before PODs start
klog.V(4).Info("zero pending pods")
return nil, &zeroPendingErr{}
case 1:
// perfect, only one option
klog.V(4).Info("only one pending pod")
if _, ok := candidates[0].pod.Annotations[gasCardAnnotation]; !ok {
klog.Warningf("Pending POD annotations from scheduler not yet visible for pod %q", candidates[0].pod.Name)
return nil, &retryErr{}
}
return &candidates[0], nil
default: // > 1 candidates, not good, need to pick the best
// look for scheduler timestamps and sort by them
klog.V(4).Infof("%v pods pending, picking oldest", numCandidates)
timestampedCandidates := []podCandidate{}
for _, candidate := range candidates {
if _, ok := pendingPods[candidate.name].Annotations[gasTSAnnotation]; ok {
timestampedCandidates = append(timestampedCandidates, candidate)
}
}
sort.Slice(timestampedCandidates,
func(i, j int) bool {
return pendingPods[timestampedCandidates[i].name].Annotations[gasTSAnnotation] <
pendingPods[timestampedCandidates[j].name].Annotations[gasTSAnnotation]
})
if len(timestampedCandidates) == 0 {
klog.Warning("Pending POD annotations from scheduler not yet visible")
return nil, &retryErr{}
}
return &timestampedCandidates[0], nil
}
}
// +kubebuilder:rbac:groups="",resources=pods,verbs=list
// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu.
func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) {
selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName +
",status.phase=" + string(v1.PodPending))
if err != nil {
return nil, errors.Wrap(err, "unable to parse selector")
}
pendingPodList, err := rm.clientset.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
FieldSelector: selector.String(),
})
if err != nil {
return nil, errors.Wrap(err, "unable to list pods")
}
// make a map ouf of the list, accept only GPU-using pods
pendingPods := make(map[string]*v1.Pod)
for i := range pendingPodList.Items {
pod := &pendingPodList.Items[i]
if numGPUUsingContainers(pod, rm.fullResourceName) > 0 {
pendingPods[pod.Name] = pod
}
}
return pendingPods, nil
}
// findAllocationPodCandidates returns a slice of all potential allocation candidate pods.
// This goes through the PODs listed in the podresources grpc service and finds those among pending
// pods which don't have all GPU devices allocated.
func (rm *resourceManager) findAllocationPodCandidates(pendingPods map[string]*v1.Pod) ([]podCandidate, error) {
resListerClient, clientConn, err := rm.prGetClientFunc(grpcAddress, grpcTimeout, grpcBufferSize)
if err != nil {
return nil, errors.Wrap(err, "Could not get a grpc client for reading plugin resources")
}
defer clientConn.Close()
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()
resp, err := resListerClient.List(ctx, &podresourcesv1.ListPodResourcesRequest{})
if err != nil {
return nil, errors.Wrap(err, "Could not read plugin resources via grpc")
}
candidates := []podCandidate{}
for _, podRes := range resp.PodResources {
// count allocated gpu-using containers
numContainersAllocated := 0
for _, cont := range podRes.Containers {
for _, dev := range cont.Devices {
if dev.ResourceName == rm.fullResourceName {
numContainersAllocated++
break
}
}
}
if pod, pending := pendingPods[podRes.Name]; pending {
allocationTargetNum := numGPUUsingContainers(pod, rm.fullResourceName)
if numContainersAllocated < allocationTargetNum {
candidate := podCandidate{
pod: pod,
name: podRes.Name,
allocatedContainers: numContainersAllocated,
allocationTargetNum: allocationTargetNum,
}
candidates = append(candidates, candidate)
}
}
}
return candidates, nil
}
func (rm *resourceManager) SetDevInfos(deviceInfos DeviceInfoMap) {
rm.mutex.Lock()
defer rm.mutex.Unlock()
rm.deviceInfos = deviceInfos
}
func (rm *resourceManager) createAllocateResponse(cards []string, tileAffinityMask string) (*pluginapi.AllocateResponse, error) {
rm.mutex.Lock()
defer rm.mutex.Unlock()
allocateResponse := pluginapi.AllocateResponse{}
cresp := pluginapi.ContainerAllocateResponse{}
for _, card := range cards {
newDeviceID := card + "-0"
dev, ok := rm.deviceInfos[newDeviceID]
if !ok {
klog.Warningf("No device info for %q, using default allocation method devices", newDeviceID)
return nil, &dpapi.UseDefaultMethodError{}
}
// add new devices
nodes := dev.nodes
for i := range nodes {
cresp.Devices = append(cresp.Devices, &nodes[i])
}
// add new mounts
mounts := dev.mounts
for i := range mounts {
cresp.Mounts = append(cresp.Mounts, &mounts[i])
}
for key, value := range dev.envs {
if cresp.Envs == nil {
cresp.Envs = make(map[string]string)
}
cresp.Envs[key] = value
}
}
if tileAffinityMask != "" {
if cresp.Envs == nil {
cresp.Envs = make(map[string]string)
}
cresp.Envs[levelZeroAffinityMaskEnvVar] = tileAffinityMask
}
allocateResponse.ContainerResponses = append(allocateResponse.ContainerResponses, &cresp)
return &allocateResponse, nil
}
func numGPUUsingContainers(pod *v1.Pod, fullResourceName string) int {
num := 0
for _, container := range pod.Spec.Containers {
for reqName, quantity := range container.Resources.Requests {
resourceName := reqName.String()
if resourceName == fullResourceName {
value, _ := quantity.AsInt64()
if value > 0 {
num++
break
}
}
}
}
return num
}
// containerCards returns the cards to use for a single container.
// gpuUsingContainerIndex 0 == first gpu-using container in the pod.
func containerCards(pod *v1.Pod, gpuUsingContainerIndex int) []string {
fullAnnotation := pod.Annotations[gasCardAnnotation]
cardLists := strings.Split(fullAnnotation, "|")
klog.V(3).Infof("%s:%v", fullAnnotation, cardLists)
i := 0
for _, cardList := range cardLists {
cards := strings.Split(cardList, ",")
if len(cards) > 0 && len(cardList) > 0 {
if gpuUsingContainerIndex == i {
klog.V(3).Infof("Cards for container nr %v in pod %v are %v", gpuUsingContainerIndex, pod.Name, cards)
return cards
}
i++
}
}
klog.Warningf("couldn't find cards for gpu using container index %v", gpuUsingContainerIndex)
return nil
}
func convertTileInfoToEnvMask(tileInfo string) string {
cardTileList := strings.Split(tileInfo, ",")
var tileIndices []string
for i, cardList := range cardTileList {
cards := strings.Split(cardList, ",")
for _, cardTileCombos := range cards {
cardTileSplit := strings.Split(cardTileCombos, ":")
if len(cardTileSplit) < 2 {
klog.Warningf("invalid card tile combo string (%v)", cardTileCombos)
return ""
}
tiles := strings.Split(cardTileSplit[1], "+")
var combos []string
for _, tile := range tiles {
tileNoStr := strings.TrimPrefix(tile, "gt")
tileNo, err := strconv.ParseInt(tileNoStr, 10, 16)
if err != nil {
continue
}
levelZeroCardTileCombo :=
strconv.FormatInt(int64(i), 10) + "." +
strconv.FormatInt(int64(tileNo), 10)
combos = append(combos, levelZeroCardTileCombo)
}
tileIndices = append(tileIndices, strings.Join(combos, ","))
}
}
return strings.Join(tileIndices, ",")
}
// containerTiles returns the tile indices to use for a single container.
// Indices should be passed to level zero env variable to guide execution
// gpuUsingContainerIndex 0 == first gpu-using container in the pod.
// annotation example:
// gas-container-tiles=card0:gt0+gt1,card1:gt0|card2:gt1+gt2||card0:gt3
func containerTileAffinityMask(pod *v1.Pod, gpuUsingContainerIndex int) string {
fullAnnotation := pod.Annotations[gasTileAnnotation]
if fullAnnotation == "" {
return ""
}
tileLists := strings.Split(fullAnnotation, "|")
klog.Infof("%s:%v", fullAnnotation, tileLists)
i := 0
for _, containerTileInfo := range tileLists {
if len(containerTileInfo) == 0 {
continue
}
if i == gpuUsingContainerIndex {
return convertTileInfoToEnvMask(containerTileInfo)
}
i++
}
klog.Warningf("couldn't find tile info for gpu using container index %v", gpuUsingContainerIndex)
return ""
}
func getClientset() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}