intel-device-plugins-for-ku.../cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go
Tuomas Katila 518a8606ff gpu: add levelzero sidecar support for plugin and the deployment files
In addition to the levelzero's health data use, this adds support to
scan devices in WSL. Scanning happens by retrieving Intel device
indices from the Level-Zero API.

Signed-off-by: Tuomas Katila <tuomas.katila@intel.com>
2024-09-19 19:14:15 +03:00

986 lines
28 KiB
Go

// Copyright 2021-2023 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rm
import (
"context"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/json"
"io"
"math/big"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin"
"github.com/pkg/errors"
"google.golang.org/grpc"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
sslices "k8s.io/utils/strings/slices"
)
const (
gasTSAnnotation = "gas-ts"
gasCardAnnotation = "gas-container-cards"
gasTileAnnotation = "gas-container-tiles"
LevelzeroAffinityMaskEnvVar = "ZE_AFFINITY_MASK"
levelzeroHierarchyEnvVar = "ZE_FLAT_DEVICE_HIERARCHY"
hierarchyModeComposite = "COMPOSITE"
hierarchyModeFlat = "FLAT"
hierarchyModeCombined = "COMBINED"
grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock"
grpcBufferSize = 4 * 1024 * 1024
grpcTimeout = 5 * time.Second
kubeletAPITimeout = 5 * time.Second
kubeletAPIMaxRetries = 5
kubeletHTTPSCertPath = "/var/lib/kubelet/pki/kubelet.crt"
// This is detected incorrectly as credentials
//nolint:gosec
serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
)
// Errors.
type retryErr struct{}
type zeroPendingErr struct{}
func (e *retryErr) Error() string {
return "things didn't work out, but perhaps a retry will help"
}
func (e *zeroPendingErr) Error() string {
return "there are no pending pods anymore in this node"
}
type podCandidate struct {
pod *v1.Pod
name string
allocatedContainerCount int
allocationTargetNum int
}
// DeviceInfo is a subset of deviceplugin.DeviceInfo
// It's a lighter version of the full DeviceInfo as it is used
// to store fractional devices.
type DeviceInfo struct {
envs map[string]string
nodes []pluginapi.DeviceSpec
mounts []pluginapi.Mount
}
type getClientFunc func(string, time.Duration, int) (podresourcesv1.PodResourcesListerClient, *grpc.ClientConn, error)
// ResourceManager interface for the fractional resource handling.
type ResourceManager interface {
CreateFractionalResourceResponse(*pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error)
GetPreferredFractionalAllocation(*pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error)
SetDevInfos(DeviceInfoMap)
SetTileCountPerCard(count uint64)
}
type containerAssignments struct {
deviceIds map[string]bool
tileEnv string
}
type podAssignmentDetails struct {
containers []containerAssignments
}
type resourceManager struct {
clientset kubernetes.Interface
deviceInfos DeviceInfoMap
prGetClientFunc getClientFunc
assignments map[string]podAssignmentDetails // pod name -> assignment details
nodeName string
hostIP string
skipID string
fullResourceNames []string
retryTimeout time.Duration
cleanupInterval time.Duration
mutex sync.RWMutex // for devTree updates during scan
cleanupMutex sync.RWMutex // for assignment details during cleanup
useKubelet bool
tileCountPerCard uint64
}
// NewDeviceInfo creates a new DeviceInfo.
func NewDeviceInfo(nodes []pluginapi.DeviceSpec, mounts []pluginapi.Mount, envs map[string]string) *DeviceInfo {
return &DeviceInfo{
nodes: nodes,
mounts: mounts,
envs: envs,
}
}
// DeviceInfoMap is a map of device infos. deviceId -> *DeviceInfo.
type DeviceInfoMap map[string]*DeviceInfo
// NewDeviceInfoMap creates a new DeviceInfoMap.
func NewDeviceInfoMap() DeviceInfoMap {
return DeviceInfoMap{}
}
// NewResourceManager creates a new resource manager.
func NewResourceManager(skipID string, fullResourceNames []string) (ResourceManager, error) {
clientset, err := getClientset()
if err != nil {
return nil, errors.Wrap(err, "couldn't get clientset")
}
rm := resourceManager{
nodeName: os.Getenv("NODE_NAME"),
hostIP: os.Getenv("HOST_IP"),
clientset: clientset,
skipID: skipID,
fullResourceNames: fullResourceNames,
prGetClientFunc: podresources.GetV1Client,
assignments: make(map[string]podAssignmentDetails),
retryTimeout: 1 * time.Second,
cleanupInterval: 20 * time.Minute,
useKubelet: true,
}
klog.Info("GPU device plugin resource manager enabled")
// Try listing Pods once to detect if Kubelet API works
_, err = rm.listPodsFromKubelet()
if err != nil {
klog.V(2).Info("Not using Kubelet API")
rm.useKubelet = false
} else {
klog.V(2).Info("Using Kubelet API")
}
go func() {
getRandDuration := func() time.Duration {
cleanupIntervalSeconds := int(rm.cleanupInterval.Seconds())
n, _ := rand.Int(rand.Reader, big.NewInt(int64(cleanupIntervalSeconds)))
return rm.cleanupInterval/2 + time.Duration(n.Int64())*time.Second
}
ticker := time.NewTicker(getRandDuration())
for range ticker.C {
klog.V(4).Info("Running cleanup")
ticker.Reset(getRandDuration())
// Gather both running and pending pods. It might happen that
// cleanup is triggered between GetPreferredAllocation and Allocate
// and it would remove the assignment data for the soon-to-be allocated pod
running := rm.listPodsOnNodeWithStates([]string{string(v1.PodRunning), string(v1.PodPending)})
func() {
rm.cleanupMutex.Lock()
defer rm.cleanupMutex.Unlock()
for podName := range rm.assignments {
if _, found := running[podName]; !found {
klog.V(4).Info("Removing from assignments: ", podName)
delete(rm.assignments, podName)
}
}
}()
klog.V(4).Info("Cleanup done")
}
}()
return &rm, nil
}
// Generate a unique key for Pod.
func getPodKey(pod *v1.Pod) string {
return pod.Namespace + "&" + pod.Name
}
// Generate a unique key for PodResources.
func getPodResourceKey(res *podresourcesv1.PodResources) string {
return res.Namespace + "&" + res.Name
}
func (rm *resourceManager) listPodsFromAPIServer() (*v1.PodList, error) {
selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName)
if err != nil {
return &v1.PodList{}, err
}
klog.V(4).Info("Requesting pods from API server")
podList, err := rm.clientset.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
FieldSelector: selector.String(),
})
if err != nil {
klog.Error("pod listing failed:", err)
if err != nil {
return &v1.PodList{}, err
}
}
return podList, nil
}
// +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=list;get
func (rm *resourceManager) listPodsFromKubelet() (*v1.PodList, error) {
var podList v1.PodList
token, err := os.ReadFile(serviceAccountTokenPath)
if err != nil {
klog.Warning("Failed to read token for kubelet API access: ", err)
return &podList, err
}
kubeletCert, err := os.ReadFile(kubeletHTTPSCertPath)
if err != nil {
klog.Warning("Failed to read kubelet cert: ", err)
return &podList, err
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(kubeletCert)
// There isn't an official documentation for the kubelet API. There is a blog post:
// https://www.deepnetwork.com/blog/2020/01/13/kubelet-api.html
// And a tool to work with the API:
// https://github.com/cyberark/kubeletctl
kubeletURL := "https://" + rm.hostIP + ":10250/pods"
req, err := http.NewRequestWithContext(context.Background(), "GET", kubeletURL, nil)
if err != nil {
klog.Warning("Failure creating new request: ", err)
return &podList, err
}
req.Header.Set("Authorization", "Bearer "+string(token))
tr := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: certPool,
ServerName: rm.nodeName,
},
}
client := &http.Client{
Timeout: kubeletAPITimeout,
Transport: tr,
}
klog.V(4).Infof("Requesting pods from kubelet (%s)", kubeletURL)
resp, err := (*client).Do(req)
if err != nil {
klog.Warning("Failed to read pods from kubelet API: ", err)
return &podList, err
}
body, err := io.ReadAll(resp.Body)
if err != nil {
klog.Warning("Failed to read http response body: ", err)
return &podList, err
}
resp.Body.Close()
err = json.Unmarshal(body, &podList)
if err != nil {
klog.Warning("Failed to unmarshal PodList from response: ", err)
return &podList, err
}
return &podList, nil
}
func (rm *resourceManager) listPods() (*v1.PodList, error) {
// Try to use kubelet API as long as it provides listings within retries
if rm.useKubelet {
var neterr net.Error
for i := 0; i < kubeletAPIMaxRetries; i++ {
if podList, err := rm.listPodsFromKubelet(); err == nil {
return podList, nil
} else if errors.As(err, &neterr) && neterr.Timeout() {
continue
}
// If error is non-timeout, break to stop using kubelet API
break
}
klog.Warning("Stopping Kubelet API use due to error/timeout")
rm.useKubelet = false
}
return rm.listPodsFromAPIServer()
}
func (rm *resourceManager) listPodsOnNodeWithStates(states []string) map[string]*v1.Pod {
pods := make(map[string]*v1.Pod)
podList, err := rm.listPods()
if err != nil {
klog.Error("pod listing failed:", err)
return pods
}
for i := range podList.Items {
phase := string(podList.Items[i].Status.Phase)
if sslices.Contains(states, phase) {
key := getPodKey(&podList.Items[i])
pods[key] = &podList.Items[i]
}
}
return pods
}
// CreateFractionalResourceResponse returns allocate response with the details
// assigned in GetPreferredFractionalAllocation
// This intentionally only logs errors and returns with the UseDefaultMethodError,
// in case any errors are hit. This is to avoid clusters filling up with unexpected admission errors.
func (rm *resourceManager) CreateFractionalResourceResponse(request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
if !isAllocateRequestOk(request, rm.skipID) {
// it is better to leave allocated gpu devices as is and return
return nil, &dpapi.UseDefaultMethodError{}
}
klog.V(4).Info("Proposed device ids: ", request.ContainerRequests[0].DevicesIDs)
podCandidate, err := rm.findAllocationPodCandidate()
if errors.Is(err, &retryErr{}) {
klog.Warning("retrying POD resolving after sleeping")
time.Sleep(rm.retryTimeout)
podCandidate, err = rm.findAllocationPodCandidate()
}
if err != nil {
if !errors.Is(err, &zeroPendingErr{}) {
klog.Error("allocation candidate not found, perhaps the GPU scheduler extender is not called, err:", err)
}
// it is better to leave allocated gpu devices as is and return
return nil, &dpapi.UseDefaultMethodError{}
}
pod := podCandidate.pod
rm.cleanupMutex.Lock()
assignment, found := rm.assignments[getPodKey(pod)]
if !found {
rm.cleanupMutex.Unlock()
klog.Error("couldn't find allocation info from assignments:", getPodKey(pod))
return nil, &dpapi.UseDefaultMethodError{}
}
containerIndex := podCandidate.allocatedContainerCount
affinityMask := assignment.containers[containerIndex].tileEnv
getPrefDevices := assignment.containers[containerIndex].deviceIds
rm.cleanupMutex.Unlock()
devIds := request.ContainerRequests[0].DevicesIDs
// Check if all the preferred devices were also used
if len(devIds) != len(getPrefDevices) {
klog.Warningf("Allocate called with odd number of device IDs: %d vs %d", len(devIds), len(getPrefDevices))
}
for _, devID := range devIds {
if _, found := getPrefDevices[devID]; !found {
klog.Warningf("Not preferred device used in Allocate: %s (%v)", devID, getPrefDevices)
}
}
klog.V(4).Info("Allocate affinity mask: ", affinityMask)
klog.V(4).Info("Allocate device ids: ", devIds)
return rm.createAllocateResponse(devIds, affinityMask)
}
func (rm *resourceManager) GetPreferredFractionalAllocation(request *pluginapi.PreferredAllocationRequest) (
*pluginapi.PreferredAllocationResponse, error) {
if !isPreferredAllocationRequestOk(request, rm.skipID) {
// it is better to leave allocated gpu devices as is and return
return &pluginapi.PreferredAllocationResponse{}, nil
}
klog.V(4).Info("GetPreferredAllocation request: ", request)
podCandidate, err := rm.findAllocationPodCandidate()
if errors.Is(err, &retryErr{}) {
klog.Warning("retrying POD resolving after sleeping")
time.Sleep(rm.retryTimeout)
podCandidate, err = rm.findAllocationPodCandidate()
}
if err != nil {
if !errors.Is(err, &zeroPendingErr{}) {
klog.Error("allocation candidate not found, perhaps the GPU scheduler extender is not called, err:", err)
}
// Return empty response as returning an error causes
// the pod to be labeled as UnexpectedAdmissionError
return &pluginapi.PreferredAllocationResponse{}, nil
}
pod := podCandidate.pod
containerIndex := podCandidate.allocatedContainerCount
cards := containerCards(pod, containerIndex)
affinityMask := containerTileAffinityMask(pod, containerIndex, int(rm.tileCountPerCard))
podKey := getPodKey(pod)
creq := request.ContainerRequests[0]
klog.V(4).Info("Get preferred fractional allocation: ",
podKey, creq.AllocationSize, creq.MustIncludeDeviceIDs, creq.AvailableDeviceIDs)
deviceIds := selectDeviceIDsForContainer(
int(creq.AllocationSize), cards, creq.AvailableDeviceIDs, creq.MustIncludeDeviceIDs)
// Map container assignment details per pod name
rm.cleanupMutex.Lock()
assignments, found := rm.assignments[podKey]
if !found {
assignments.containers = make([]containerAssignments, podCandidate.allocationTargetNum)
}
assignments.containers[containerIndex].tileEnv = affinityMask
// Store device ids so we can double check the ones in Allocate
assignments.containers[containerIndex].deviceIds = make(map[string]bool)
for _, devID := range deviceIds {
assignments.containers[containerIndex].deviceIds[devID] = true
}
rm.assignments[podKey] = assignments
rm.cleanupMutex.Unlock()
klog.V(4).Info("Selected devices for container: ", deviceIds)
response := pluginapi.PreferredAllocationResponse{
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
{DeviceIDs: deviceIds},
},
}
return &response, nil
}
// selectDeviceIDsForContainer selects suitable device ids from deviceIds and mustHaveDeviceIds
// the selection is guided by the cards list.
func selectDeviceIDsForContainer(requestedCount int, cards, deviceIds, mustHaveDeviceIds []string) []string {
getBaseCard := func(deviceId string) string {
return strings.Split(deviceId, "-")[0]
}
if requestedCount < len(cards) {
klog.Warningf("Requested count is less than card count: %d vs %d.", requestedCount, len(cards))
cards = cards[0:requestedCount]
}
if requestedCount > len(cards) {
klog.Warningf("Requested count is higher than card count: %d vs %d.", requestedCount, len(cards))
}
// map of cardX -> device id list
available := map[string][]string{}
// Keep the last used index so we can pick the next one
availableIndex := map[string]int{}
// Place must have IDs first so they get used
for _, devID := range mustHaveDeviceIds {
baseCard := getBaseCard(devID)
available[baseCard] = append(available[baseCard], devID)
}
for _, devID := range deviceIds {
baseCard := getBaseCard(devID)
available[baseCard] = append(available[baseCard], devID)
}
selected := []string{}
for _, card := range cards {
indexNow := availableIndex[card]
availableDevices, found := available[card]
if !found {
klog.Warningf("card %s is not found from known devices: %v", card, available)
continue
}
if indexNow < len(availableDevices) {
selected = append(selected, availableDevices[indexNow])
indexNow++
availableIndex[card] = indexNow
}
}
return selected
}
func isAllocateRequestOk(rqt *pluginapi.AllocateRequest, skipID string) bool {
// so far kubelet calls allocate for each container separately. If that changes, we need to refine our logic.
if len(rqt.ContainerRequests) != 1 {
klog.Warning("multi-container allocation request not supported")
return false
}
crqt := rqt.ContainerRequests[0]
for _, id := range crqt.DevicesIDs {
if id == skipID {
return false // intentionally not printing anything, this request is skipped
}
}
return true
}
func isPreferredAllocationRequestOk(rqt *pluginapi.PreferredAllocationRequest, skipID string) bool {
// so far kubelet calls allocate for each container separately. If that changes, we need to refine our logic.
if len(rqt.ContainerRequests) != 1 {
klog.Warning("multi-container allocation request not supported")
return false
}
crqt := rqt.ContainerRequests[0]
for _, id := range crqt.AvailableDeviceIDs {
if id == skipID {
return false // intentionally not printing anything, this request is skipped
}
}
return true
}
// findAllocationPodCandidate tries to find the best allocation candidate pod, which must be:
//
// -pending for this node
// -using GPU resources in its spec
// -is found via grpc service with unallocated GPU devices
//
// returns:
//
// -the candidate pod struct pointer and no error, or
// -errRetry if unsuccessful, but there is perhaps hope of trying again with better luck
// -errZeroPending if no pending pods exist anymore (which is fine)
// -any grpc communication errors
func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {
// get map of pending pods for this node
pendingPods, err := rm.getNodePendingGPUPods()
if err != nil {
return nil, err
}
candidates, err := rm.findAllocationPodCandidates(pendingPods)
if err != nil {
return nil, err
}
numCandidates := len(candidates)
switch numCandidates {
case 0:
// fine, this typically happens when deployment is deleted before PODs start
klog.V(4).Info("zero pending pods")
return nil, &zeroPendingErr{}
case 1:
// perfect, only one option
klog.V(4).Info("only one pending pod")
if _, ok := candidates[0].pod.Annotations[gasCardAnnotation]; !ok {
klog.Warningf("Pending POD annotations from scheduler not yet visible for pod %q", candidates[0].pod.Name)
return nil, &retryErr{}
}
return &candidates[0], nil
default: // > 1 candidates, not good, need to pick the best
// look for scheduler timestamps and sort by them
klog.V(4).Infof("%v pods pending, picking oldest", numCandidates)
timestampedCandidates := []podCandidate{}
for _, candidate := range candidates {
if _, ok := pendingPods[candidate.name].Annotations[gasTSAnnotation]; ok {
timestampedCandidates = append(timestampedCandidates, candidate)
}
}
// .name here refers to a namespace+name combination
sort.Slice(timestampedCandidates,
func(i, j int) bool {
return pendingPods[timestampedCandidates[i].name].Annotations[gasTSAnnotation] <
pendingPods[timestampedCandidates[j].name].Annotations[gasTSAnnotation]
})
if len(timestampedCandidates) == 0 {
klog.Warning("Pending POD annotations from scheduler not yet visible")
return nil, &retryErr{}
}
return &timestampedCandidates[0], nil
}
}
// +kubebuilder:rbac:groups="",resources=pods,verbs=list
// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu.
func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) {
pendingPods := rm.listPodsOnNodeWithStates([]string{string(v1.PodPending)})
for podName, pod := range pendingPods {
if numGPUUsingContainers(pod, rm.fullResourceNames) == 0 {
delete(pendingPods, podName)
}
}
return pendingPods, nil
}
// findAllocationPodCandidates returns a slice of all potential allocation candidate pods.
// This goes through the PODs listed in the podresources grpc service and finds those among pending
// pods which don't have all GPU devices allocated.
func (rm *resourceManager) findAllocationPodCandidates(pendingPods map[string]*v1.Pod) ([]podCandidate, error) {
resListerClient, clientConn, err := rm.prGetClientFunc(grpcAddress, grpcTimeout, grpcBufferSize)
if err != nil {
return nil, errors.Wrap(err, "Could not get a grpc client for reading plugin resources")
}
defer clientConn.Close()
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()
resp, err := resListerClient.List(ctx, &podresourcesv1.ListPodResourcesRequest{})
if err != nil {
return nil, errors.Wrap(err, "Could not read plugin resources via grpc")
}
candidates := []podCandidate{}
for _, podRes := range resp.PodResources {
// count allocated gpu-using containers
numContainersAllocated := 0
for _, cont := range podRes.Containers {
for _, dev := range cont.Devices {
if sslices.Contains(rm.fullResourceNames, dev.ResourceName) {
numContainersAllocated++
break
}
}
}
key := getPodResourceKey(podRes)
if pod, pending := pendingPods[key]; pending {
allocationTargetNum := numGPUUsingContainers(pod, rm.fullResourceNames)
if numContainersAllocated < allocationTargetNum {
candidate := podCandidate{
pod: pod,
name: key,
allocatedContainerCount: numContainersAllocated,
allocationTargetNum: allocationTargetNum,
}
candidates = append(candidates, candidate)
}
}
}
return candidates, nil
}
func (rm *resourceManager) SetDevInfos(deviceInfos DeviceInfoMap) {
rm.mutex.Lock()
defer rm.mutex.Unlock()
rm.deviceInfos = deviceInfos
}
func (rm *resourceManager) SetTileCountPerCard(count uint64) {
rm.mutex.Lock()
defer rm.mutex.Unlock()
rm.tileCountPerCard = count
}
func (rm *resourceManager) createAllocateResponse(deviceIds []string, tileAffinityMask string) (*pluginapi.AllocateResponse, error) {
rm.mutex.Lock()
defer rm.mutex.Unlock()
allocateResponse := pluginapi.AllocateResponse{}
cresp := pluginapi.ContainerAllocateResponse{}
for _, devID := range deviceIds {
dev, ok := rm.deviceInfos[devID]
if !ok {
klog.Warningf("No device info for %q, using default allocation method devices", devID)
return nil, &dpapi.UseDefaultMethodError{}
}
// add new devices
nodes := dev.nodes
for i := range nodes {
cresp.Devices = append(cresp.Devices, &nodes[i])
}
// add new mounts
mounts := dev.mounts
for i := range mounts {
cresp.Mounts = append(cresp.Mounts, &mounts[i])
}
for key, value := range dev.envs {
if cresp.Envs == nil {
cresp.Envs = make(map[string]string)
}
cresp.Envs[key] = value
}
}
if tileAffinityMask != "" {
if cresp.Envs == nil {
cresp.Envs = make(map[string]string)
}
cresp.Envs[LevelzeroAffinityMaskEnvVar] = tileAffinityMask
}
allocateResponse.ContainerResponses = append(allocateResponse.ContainerResponses, &cresp)
return &allocateResponse, nil
}
func numGPUUsingContainers(pod *v1.Pod, fullResourceNames []string) int {
num := 0
for _, container := range pod.Spec.Containers {
for reqName, quantity := range container.Resources.Requests {
resourceName := reqName.String()
if sslices.Contains(fullResourceNames, resourceName) {
value, _ := quantity.AsInt64()
if value > 0 {
num++
break
}
}
}
}
return num
}
// containerCards returns the cards to use for a single container.
// gpuUsingContainerIndex 0 == first gpu-using container in the pod.
func containerCards(pod *v1.Pod, gpuUsingContainerIndex int) []string {
fullAnnotation := pod.Annotations[gasCardAnnotation]
cardLists := strings.Split(fullAnnotation, "|")
klog.V(3).Infof("%s:%v", fullAnnotation, cardLists)
i := 0
for _, cardList := range cardLists {
cards := strings.Split(cardList, ",")
if len(cards) > 0 && len(cardList) > 0 {
if gpuUsingContainerIndex == i {
klog.V(3).Infof("Cards for container nr %v in pod %v are %v", gpuUsingContainerIndex, getPodKey(pod), cards)
return cards
}
i++
}
}
klog.Warningf("couldn't find cards for gpu using container index %v", gpuUsingContainerIndex)
return nil
}
// Guesses level zero hierarchy mode for the container. Defaults to the new "flat" mode
// if no mode is set in the container's env variables.
func guessLevelzeroHierarchyMode(pod *v1.Pod, containerIndex int) string {
klog.V(4).Infof("Checking pod %s envs", pod.Name)
if containerIndex < len(pod.Spec.Containers) {
c := pod.Spec.Containers[containerIndex]
if c.Env != nil {
for _, env := range c.Env {
if env.Name == levelzeroHierarchyEnvVar {
switch env.Value {
// Check that the value is valid.
case hierarchyModeComposite:
fallthrough
case hierarchyModeFlat:
fallthrough
case hierarchyModeCombined:
klog.V(4).Infof("Returning %s hierarchy", env.Value)
return env.Value
}
break
}
}
}
}
klog.V(4).Infof("Returning default %s hierarchy", hierarchyModeFlat)
return hierarchyModeFlat
}
func convertTileInfoToEnvMask(tileInfo string, tilesPerCard int, hierarchyMode string) string {
cards := strings.Split(tileInfo, ",")
tileIndices := make([]string, len(cards))
for i, cardTileCombos := range cards {
cardTileSplit := strings.Split(cardTileCombos, ":")
if len(cardTileSplit) != 2 {
klog.Warningf("invalid card tile combo string (%v)", cardTileCombos)
return ""
}
tiles := strings.Split(cardTileSplit[1], "+")
var maskItems []string
for _, tile := range tiles {
if !strings.HasPrefix(tile, "gt") {
klog.Warningf("invalid tile syntax (%v)", tile)
return ""
}
tileNoStr := strings.TrimPrefix(tile, "gt")
tileNo, err := strconv.ParseInt(tileNoStr, 10, 16)
if err != nil {
klog.Warningf("invalid tile syntax (%v)", tile)
return ""
}
maskItem := ""
if hierarchyMode == hierarchyModeComposite {
maskItem =
strconv.FormatInt(int64(i), 10) + "." +
strconv.FormatInt(tileNo, 10)
} else {
// This handles both FLAT and COMBINED hierarchy.
devIndex := i*tilesPerCard + int(tileNo)
maskItem = strconv.FormatInt(int64(devIndex), 10)
}
maskItems = append(maskItems, maskItem)
}
tileIndices[i] = strings.Join(maskItems, ",")
}
return strings.Join(tileIndices, ",")
}
// containerTiles returns the tile indices to use for a single container.
// Indices should be passed to level zero env variable to guide execution
// gpuUsingContainerIndex 0 == first gpu-using container in the pod.
// The affinity mask is not needed for 1-tile GPUs. With 1-tile GPUs normal
// GPU exposing is enough to limit container's access to targeted devices.
// annotation example:
// gas-container-tiles=card0:gt0+gt1,card1:gt0|card2:gt1+gt2||card0:gt3.
func containerTileAffinityMask(pod *v1.Pod, gpuUsingContainerIndex, tilesPerCard int) string {
fullAnnotation := pod.Annotations[gasTileAnnotation]
onlyDividers := strings.Count(fullAnnotation, "|") == len(fullAnnotation)
if fullAnnotation == "" || onlyDividers || tilesPerCard <= 1 {
return ""
}
tileLists := strings.Split(fullAnnotation, "|")
klog.Infof("%s:%v", fullAnnotation, tileLists)
i := 0
for containerIndex, containerTileInfo := range tileLists {
if len(containerTileInfo) == 0 {
continue
}
if i == gpuUsingContainerIndex {
return convertTileInfoToEnvMask(containerTileInfo, tilesPerCard, guessLevelzeroHierarchyMode(pod, containerIndex))
}
i++
}
klog.Warningf("couldn't find tile info for gpu using container index %v", gpuUsingContainerIndex)
return ""
}
func getClientset() (*kubernetes.Clientset, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}