diff --git a/cmd/gpu_plugin/README.md b/cmd/gpu_plugin/README.md index b5e56e32..6811aeec 100644 --- a/cmd/gpu_plugin/README.md +++ b/cmd/gpu_plugin/README.md @@ -119,6 +119,30 @@ $ kubectl apply -k deployments/gpu_plugin/overlays/nfd_labeled_nodes daemonset.apps/intel-gpu-plugin created ``` +The experimental fractional-resource feature can be enabled by running: + +```bash +$ kubectl apply -k deployments/gpu_plugin/overlays/fractional_resources +serviceaccount/resource-reader-sa created +clusterrole.rbac.authorization.k8s.io/resource-reader created +clusterrolebinding.rbac.authorization.k8s.io/resource-reader-rb created +daemonset.apps/intel-gpu-plugin created +``` + +Usage of fractional GPU resources, such as GPU memory, requires that the cluster has node +extended resources with the name prefix `gpu.intel.com/`. Those can be created with NFD +by running the hook installed by the plugin initcontainer. When fractional resources are +enabled, the plugin lets a scheduler extender do card selection decisions based on resource +availability and the amount of extended resources requested in the pod spec. + +The scheduler extender then needs to annotate the pod objects with unique +increasing numeric timestamps in the annotation `gas-ts` and container card selections in +`gas-container-cards` annotation. The latter has container separator `|` and card separator +`,`. Example for a pod with two containers and both containers getting two cards: +`gas-container-cards:card0,card1|card2,card3`. Enabling the fractional-resource support +in the plugin without running such an annotation adding scheduler extender in the cluster +will only slow down GPU-deployments, so do not enable this feature unnecessarily. + > **Note**: It is also possible to run the GPU device plugin using a non-root user. To do this, the nodes' DAC rules must be configured to device plugin socket creation and kubelet registration. Furthermore, the deployments `securityContext` must be configured with appropriate `runAsUser/runAsGroup`. diff --git a/cmd/gpu_plugin/gpu_plugin.go b/cmd/gpu_plugin/gpu_plugin.go index 16dedefa..f24cf20d 100644 --- a/cmd/gpu_plugin/gpu_plugin.go +++ b/cmd/gpu_plugin/gpu_plugin.go @@ -29,6 +29,7 @@ import ( "k8s.io/klog/v2" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + "github.com/intel/intel-device-plugins-for-kubernetes/cmd/gpu_plugin/rm" dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin" ) @@ -52,8 +53,9 @@ const ( ) type cliOptions struct { - sharedDevNum int - enableMonitoring bool + sharedDevNum int + enableMonitoring bool + resourceManagement bool } type devicePlugin struct { @@ -67,10 +69,12 @@ type devicePlugin struct { scanTicker *time.Ticker scanDone chan bool + + resMan rm.ResourceManager } func newDevicePlugin(sysfsDir, devfsDir string, options cliOptions) *devicePlugin { - return &devicePlugin{ + dp := &devicePlugin{ sysfsDir: sysfsDir, devfsDir: devfsDir, options: options, @@ -79,6 +83,17 @@ func newDevicePlugin(sysfsDir, devfsDir string, options cliOptions) *devicePlugi scanTicker: time.NewTicker(scanPeriod), scanDone: make(chan bool, 1), // buffered as we may send to it before Scan starts receiving from it } + + if options.resourceManagement { + var err error + dp.resMan, err = rm.NewResourceManager(monitorID, namespace+"/"+deviceType) + if err != nil { + klog.Errorf("Failed to create resource manager: %+v", err) + return nil + } + } + + return dp } func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error { @@ -132,6 +147,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) { var monitor []pluginapi.DeviceSpec devTree := dpapi.NewDeviceTree() + rmDevInfos := rm.NewDeviceInfoMap() for _, f := range files { var nodes []pluginapi.DeviceSpec @@ -180,6 +196,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) { // Currently only one device type (i915) is supported. // TODO: check model ID to differentiate device models. devTree.AddDevice(deviceType, devID, deviceInfo) + rmDevInfos[devID] = rm.NewDeviceInfo(nodes, nil, nil) } } } @@ -189,18 +206,36 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) { devTree.AddDevice(monitorType, monitorID, deviceInfo) } + if dp.resMan != nil { + dp.resMan.SetDevInfos(rmDevInfos) + } + return devTree, nil } +func (dp *devicePlugin) Allocate(request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + if dp.resMan != nil { + return dp.resMan.ReallocateWithFractionalResources(request) + } + + return nil, &dpapi.UseDefaultMethodError{} +} + func main() { var opts cliOptions flag.BoolVar(&opts.enableMonitoring, "enable-monitoring", false, "whether to enable 'i915_monitoring' (= all GPUs) resource") + flag.BoolVar(&opts.resourceManagement, "resource-manager", false, "fractional GPU resource management") flag.IntVar(&opts.sharedDevNum, "shared-dev-num", 1, "number of containers sharing the same GPU device") flag.Parse() if opts.sharedDevNum < 1 { - klog.Warning("The number of containers sharing the same GPU must greater than zero") + klog.Error("The number of containers sharing the same GPU must greater than zero") + os.Exit(1) + } + + if opts.sharedDevNum == 1 && opts.resourceManagement { + klog.Error("Trying to use fractional resources with shared-dev-num 1 is pointless") os.Exit(1) } diff --git a/cmd/gpu_plugin/gpu_plugin_test.go b/cmd/gpu_plugin/gpu_plugin_test.go index 2c32d333..85324a8d 100644 --- a/cmd/gpu_plugin/gpu_plugin_test.go +++ b/cmd/gpu_plugin/gpu_plugin_test.go @@ -22,7 +22,9 @@ import ( "testing" "github.com/pkg/errors" + "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + "github.com/intel/intel-device-plugins-for-kubernetes/cmd/gpu_plugin/rm" dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin" ) @@ -44,6 +46,13 @@ func (n *mockNotifier) Notify(newDeviceTree dpapi.DeviceTree) { n.scanDone <- true } +type mockResourceManager struct{} + +func (m *mockResourceManager) ReallocateWithFractionalResources(*v1beta1.AllocateRequest) (*v1beta1.AllocateResponse, error) { + return &v1beta1.AllocateResponse{}, &dpapi.UseDefaultMethodError{} +} +func (m *mockResourceManager) SetDevInfos(rm.DeviceInfoMap) {} + func createTestFiles(root string, devfsdirs, sysfsdirs []string, sysfsfiles map[string][]byte) (string, string, error) { sysfs := path.Join(root, "sys") devfs := path.Join(root, "dev") @@ -66,6 +75,30 @@ func createTestFiles(root string, devfsdirs, sysfsdirs []string, sysfsfiles map[ return sysfs, devfs, nil } +func TestNewDevicePlugin(t *testing.T) { + if newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: false}) == nil { + t.Error("Failed to create plugin") + } + if newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: true}) != nil { + t.Error("Unexpectedly managed to create resource management enabled plugin inside unit tests") + } +} + +func TestAllocate(t *testing.T) { + plugin := newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: false}) + _, err := plugin.Allocate(&v1beta1.AllocateRequest{}) + if _, ok := err.(*dpapi.UseDefaultMethodError); !ok { + t.Errorf("Unexpected return value: %+v", err) + } + + // mock the rm + plugin.resMan = &mockResourceManager{} + _, err = plugin.Allocate(&v1beta1.AllocateRequest{}) + if _, ok := err.(*dpapi.UseDefaultMethodError); !ok { + t.Errorf("Unexpected return value: %+v", err) + } +} + func TestScan(t *testing.T) { tcases := []struct { name string @@ -204,6 +237,8 @@ func TestScan(t *testing.T) { scanDone: plugin.scanDone, } + plugin.resMan = &mockResourceManager{} + err = plugin.Scan(notifier) // Scans in GPU plugin never fail if err != nil { diff --git a/cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go new file mode 100644 index 00000000..eddf3021 --- /dev/null +++ b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go @@ -0,0 +1,408 @@ +// Copyright 2021 Intel Corporation. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rm + +import ( + "context" + "os" + "sort" + "strings" + "sync" + "time" + + dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin" + "github.com/pkg/errors" + "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" + "k8s.io/kubernetes/pkg/kubelet/apis/podresources" +) + +const ( + gasTSAnnotation = "gas-ts" + gasCardAnnotation = "gas-container-cards" + + grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock" + grpcBufferSize = 4 * 1024 * 1024 + grpcTimeout = 5 * time.Second + + retryTimeout = 1 * time.Second +) + +// Errors. +type retryErr struct{} +type zeroPendingErr struct{} + +func (e *retryErr) Error() string { + return "things didn't work out, but perhaps a retry will help" +} +func (e *zeroPendingErr) Error() string { + return "there are no pending pods anymore in this node" +} + +type podCandidate struct { + pod *v1.Pod + name string + allocatedContainers int + allocationTargetNum int +} + +type deviceInfo struct { + nodes []pluginapi.DeviceSpec + mounts []pluginapi.Mount + envs map[string]string +} + +type getClientFunc func(string, time.Duration, int) (podresourcesv1.PodResourcesListerClient, *grpc.ClientConn, error) + +type ResourceManager interface { + ReallocateWithFractionalResources(*pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) + SetDevInfos(DeviceInfoMap) +} + +type resourceManager struct { + mutex sync.RWMutex // for devTree updates during scan + deviceInfos DeviceInfoMap + nodeName string + clientset kubernetes.Interface + skipID string + fullResourceName string + prGetClientFunc getClientFunc +} + +func NewDeviceInfo(nodes []pluginapi.DeviceSpec, mounts []pluginapi.Mount, envs map[string]string) *deviceInfo { + return &deviceInfo{ + nodes: nodes, + mounts: mounts, + envs: envs, + } +} + +type DeviceInfoMap map[string]*deviceInfo + +func NewDeviceInfoMap() DeviceInfoMap { + return DeviceInfoMap{} +} + +// NewResourceManager creates a new resource manager. +func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error) { + clientset, err := getClientset() + + if err != nil { + return nil, errors.Wrap(err, "couldn't get clientset") + } + + rm := resourceManager{ + nodeName: os.Getenv("NODE_NAME"), + clientset: clientset, + skipID: skipID, + fullResourceName: fullResourceName, + prGetClientFunc: podresources.GetV1Client, + } + + klog.Info("GPU device plugin resource manager enabled") + + return &rm, nil +} + +// ReallocateWithFractionalResources runs the fractional resource logic. +// This intentionally only logs errors and returns with the UseDefaultMethodError, +// in case any errors are hit. This is to avoid clusters filling up with unexpected admission errors. +func (rm *resourceManager) ReallocateWithFractionalResources(request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + if !isInputOk(request, rm.skipID) { + // it is better to leave allocated gpu devices as is and return + return nil, &dpapi.UseDefaultMethodError{} + } + + podCandidate, err := rm.findAllocationPodCandidate() + if _, ok := err.(*retryErr); ok { + klog.Warning("retrying POD resolving after sleeping") + time.Sleep(retryTimeout) + podCandidate, err = rm.findAllocationPodCandidate() + } + + if err != nil { + if _, ok := err.(*zeroPendingErr); !ok { + klog.Error("allocation candidate not found, perhaps the GPU scheduler extender is not called, err:", err) + } + // it is better to leave allocated gpu devices as is and return + return nil, &dpapi.UseDefaultMethodError{} + } + + pod := podCandidate.pod + cards := containerCards(pod, podCandidate.allocatedContainers) + + return rm.createAllocateResponse(cards) +} + +func isInputOk(rqt *pluginapi.AllocateRequest, skipID string) bool { + // so far kubelet calls allocate for each container separately. If that changes, we need to refine our logic. + if len(rqt.ContainerRequests) != 1 { + klog.Warning("multi-container allocation request not supported") + return false + } + + crqt := rqt.ContainerRequests[0] + for _, id := range crqt.DevicesIDs { + if id == skipID { + return false // intentionally not printing anything, this request is skipped + } + } + + return true +} + +// findAllocationPodCandidate tries to find the best allocation candidate pod, which must be: +// -pending for this node +// -using GPU resources in its spec +// -is found via grpc service with unallocated GPU devices +// returns: +// -the candidate pod struct pointer and no error, or +// -errRetry if unsuccessful, but there is perhaps hope of trying again with better luck +// -errZeroPending if no pending pods exist anymore (which is fine) +// -any grpc communication errors +func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) { + // get map of pending pods for this node + pendingPods, err := rm.getNodePendingGPUPods() + if err != nil { + return nil, err + } + + candidates, err := rm.findAllocationPodCandidates(pendingPods) + if err != nil { + return nil, err + } + + numCandidates := len(candidates) + switch numCandidates { + case 0: + // fine, this typically happens when deployment is deleted before PODs start + klog.V(4).Info("zero pending pods") + return nil, &zeroPendingErr{} + case 1: + // perfect, only one option + klog.V(4).Info("only one pending pod") + if _, ok := candidates[0].pod.Annotations[gasCardAnnotation]; !ok { + klog.Warningf("Pending POD annotations from scheduler not yet visible for pod %q", candidates[0].pod.Name) + return nil, &retryErr{} + } + return &candidates[0], nil + default: // > 1 candidates, not good, need to pick the best + // look for scheduler timestamps and sort by them + klog.V(4).Infof("%v pods pending, picking oldest", numCandidates) + timestampedCandidates := []podCandidate{} + for _, candidate := range candidates { + if _, ok := pendingPods[candidate.name].Annotations[gasTSAnnotation]; ok { + timestampedCandidates = append(timestampedCandidates, candidate) + } + } + sort.Slice(timestampedCandidates, + func(i, j int) bool { + return pendingPods[timestampedCandidates[i].name].Annotations[gasTSAnnotation] < + pendingPods[timestampedCandidates[j].name].Annotations[gasTSAnnotation] + }) + if len(timestampedCandidates) == 0 { + klog.Warning("Pending POD annotations from scheduler not yet visible") + return nil, &retryErr{} + } + return ×tampedCandidates[0], nil + } +} + +// +kubebuilder:rbac:groups="",resources=pods,verbs=list + +// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu. +func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) { + selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName + + ",status.phase=" + string(v1.PodPending)) + + if err != nil { + return nil, errors.Wrap(err, "unable to parse selector") + } + + pendingPodList, err := rm.clientset.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{ + FieldSelector: selector.String(), + }) + + if err != nil { + return nil, errors.Wrap(err, "unable to list pods") + } + + // make a map ouf of the list, accept only GPU-using pods + pendingPods := make(map[string]*v1.Pod) + for i := range pendingPodList.Items { + pod := &pendingPodList.Items[i] + + if numGPUUsingContainers(pod, rm.fullResourceName) > 0 { + pendingPods[pod.Name] = pod + } + } + + return pendingPods, nil +} + +// findAllocationPodCandidates returns a slice of all potential allocation candidate pods. +// This goes through the PODs listed in the podresources grpc service and finds those among pending +// pods which don't have all GPU devices allocated. +func (rm *resourceManager) findAllocationPodCandidates(pendingPods map[string]*v1.Pod) ([]podCandidate, error) { + resListerClient, clientConn, err := rm.prGetClientFunc(grpcAddress, grpcTimeout, grpcBufferSize) + if err != nil { + return nil, errors.Wrap(err, "Could not get a grpc client for reading plugin resources") + } + + defer clientConn.Close() + ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout) + defer cancel() + + resp, err := resListerClient.List(ctx, &podresourcesv1.ListPodResourcesRequest{}) + if err != nil { + return nil, errors.Wrap(err, "Could not read plugin resources via grpc") + } + + candidates := []podCandidate{} + for _, podRes := range resp.PodResources { + // count allocated gpu-using containers + numContainersAllocated := 0 + for _, cont := range podRes.Containers { + for _, dev := range cont.Devices { + if dev.ResourceName == rm.fullResourceName { + numContainersAllocated++ + break + } + } + } + + if pod, pending := pendingPods[podRes.Name]; pending { + allocationTargetNum := numGPUUsingContainers(pod, rm.fullResourceName) + if numContainersAllocated < allocationTargetNum { + candidate := podCandidate{ + pod: pod, + name: podRes.Name, + allocatedContainers: numContainersAllocated, + allocationTargetNum: allocationTargetNum, + } + candidates = append(candidates, candidate) + } + } + } + + return candidates, nil +} + +func (rm *resourceManager) SetDevInfos(deviceInfos DeviceInfoMap) { + rm.mutex.Lock() + defer rm.mutex.Unlock() + rm.deviceInfos = deviceInfos +} + +func (rm *resourceManager) createAllocateResponse(cards []string) (*pluginapi.AllocateResponse, error) { + rm.mutex.Lock() + defer rm.mutex.Unlock() + + allocateResponse := pluginapi.AllocateResponse{} + cresp := pluginapi.ContainerAllocateResponse{} + + for _, card := range cards { + newDeviceID := card + "-0" + + dev, ok := rm.deviceInfos[newDeviceID] + if !ok { + klog.Warningf("No device info for %q, using default allocation method devices", newDeviceID) + return nil, &dpapi.UseDefaultMethodError{} + } + + // add new devices + nodes := dev.nodes + for i := range nodes { + cresp.Devices = append(cresp.Devices, &nodes[i]) + } + + // add new mounts + mounts := dev.mounts + for i := range mounts { + cresp.Mounts = append(cresp.Mounts, &mounts[i]) + } + + for key, value := range dev.envs { + if cresp.Envs == nil { + cresp.Envs = make(map[string]string) + } + cresp.Envs[key] = value + } + } + + allocateResponse.ContainerResponses = append(allocateResponse.ContainerResponses, &cresp) + + return &allocateResponse, nil +} + +func numGPUUsingContainers(pod *v1.Pod, fullResourceName string) int { + num := 0 + for _, container := range pod.Spec.Containers { + for reqName, quantity := range container.Resources.Requests { + resourceName := reqName.String() + if resourceName == fullResourceName { + value, _ := quantity.AsInt64() + if value > 0 { + num++ + break + } + } + } + } + return num +} + +// containerCards returns the cards to use for a single container. +// gpuUsingContainerIndex 0 == first gpu-using container in the pod. +func containerCards(pod *v1.Pod, gpuUsingContainerIndex int) []string { + fullAnnotation := pod.Annotations[gasCardAnnotation] + cardLists := strings.Split(fullAnnotation, "|") + klog.V(3).Infof("%s:%v", fullAnnotation, cardLists) + + i := 0 + for _, cardList := range cardLists { + cards := strings.Split(cardList, ",") + if len(cards) > 0 && len(cardList) > 0 { + if gpuUsingContainerIndex == i { + klog.V(3).Infof("Cards for container nr %v in pod %v are %v", gpuUsingContainerIndex, pod.Name, cards) + return cards + } + i++ + } + } + klog.Warningf("couldn't find cards for gpu using container index %v", gpuUsingContainerIndex) + return nil +} + +func getClientset() (*kubernetes.Clientset, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + return clientset, nil +} diff --git a/cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go new file mode 100644 index 00000000..ee56bbde --- /dev/null +++ b/cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go @@ -0,0 +1,241 @@ +// Copyright 2021 Intel Corporation. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rm + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "google.golang.org/grpc" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" + "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" + podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" +) + +// mockClient implements enough of k8s API for the resource manager tests. +type mockClient struct { + fake.Clientset + mockCoreV1 +} + +func (m *mockClient) CoreV1() corev1.CoreV1Interface { + return m +} + +type mockCoreV1 struct { + fakecorev1.FakeCoreV1 + mockPods +} + +func (m *mockCoreV1) Pods(namespace string) corev1.PodInterface { + return m +} + +type mockPods struct { + fakecorev1.FakePods + pods []v1.Pod +} + +func (m *mockPods) List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error) { + return &v1.PodList{ + Items: m.pods, + }, nil +} + +type mockPodResources struct { + pods []v1.Pod +} + +func (w *mockPodResources) List(ctx context.Context, + in *podresourcesv1.ListPodResourcesRequest, + opts ...grpc.CallOption) (*podresourcesv1.ListPodResourcesResponse, error) { + resp := podresourcesv1.ListPodResourcesResponse{} + for _, pod := range w.pods { + resp.PodResources = append(resp.PodResources, &podresourcesv1.PodResources{ + Name: pod.ObjectMeta.Name, Containers: []*podresourcesv1.ContainerResources{{}}, + }) + } + return &resp, nil +} +func (w *mockPodResources) GetAllocatableResources(ctx context.Context, + in *podresourcesv1.AllocatableResourcesRequest, + opts ...grpc.CallOption) (*podresourcesv1.AllocatableResourcesResponse, error) { + return nil, nil +} + +func newMockResourceManager(pods []v1.Pod) ResourceManager { + client, err := grpc.Dial("", grpc.WithInsecure()) + if err != nil { + os.Exit(1) + } + + mc := &mockClient{} + mc.mockCoreV1.mockPods.pods = pods + rm := resourceManager{ + clientset: mc, + nodeName: "TestNode", + prGetClientFunc: func(string, time.Duration, int) (podresourcesv1.PodResourcesListerClient, *grpc.ClientConn, error) { + return &mockPodResources{pods: pods}, client, nil + }, + skipID: "all", + fullResourceName: "gpu.intel.com/i915", + } + + deviceInfoMap := NewDeviceInfoMap() + deviceInfoMap["card0-0"] = NewDeviceInfo([]v1beta1.DeviceSpec{ + { + ContainerPath: "containerpath", + HostPath: "hostpath", + Permissions: "rw", + }, + }, + []v1beta1.Mount{{}}, + map[string]string{"more": "coverage"}) + deviceInfoMap["card1-0"] = NewDeviceInfo([]v1beta1.DeviceSpec{{}}, nil, nil) + rm.SetDevInfos(deviceInfoMap) + + return &rm +} + +func TestNewResourceManager(t *testing.T) { + // normal clientset is unavailable inside the unit tests + _, err := NewResourceManager("foo", "bar") + + if err == nil { + t.Errorf("unexpected success") + } +} + +func TestReallocateWithFractionalResources(t *testing.T) { + properTestPod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{gasCardAnnotation: "card0"}, + Name: "TestPod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + "gpu.intel.com/i915": resource.MustParse("1"), + }, + }, + }, + }, + }, + } + unAnnotatedTestPod := *properTestPod.DeepCopy() + unAnnotatedTestPod.ObjectMeta.Annotations = nil + properTestPod2 := *properTestPod.DeepCopy() + properTestPod2.ObjectMeta.Name = "TestPod2" + + timeStampedProperTestPod := *properTestPod.DeepCopy() + timeStampedProperTestPod.ObjectMeta.Annotations[gasTSAnnotation] = "2" + + timeStampedProperTestPod2 := *properTestPod2.DeepCopy() + timeStampedProperTestPod2.ObjectMeta.Annotations[gasTSAnnotation] = "1" + + properContainerRequests := []*v1beta1.ContainerAllocateRequest{{DevicesIDs: []string{"card0-0"}}} + + testCases := []struct { + name string + pods []v1.Pod + expectErr bool + containerRequests []*v1beta1.ContainerAllocateRequest + }{ + { + name: "Wrong number of container requests should fail", + pods: []v1.Pod{properTestPod}, + containerRequests: []*v1beta1.ContainerAllocateRequest{}, + expectErr: true, + }, + { + name: "Request for monitor resource should fail", + pods: []v1.Pod{properTestPod}, + containerRequests: []*v1beta1.ContainerAllocateRequest{{DevicesIDs: []string{"all"}}}, + expectErr: true, + }, + { + name: "Zero pending pods should fail", + pods: []v1.Pod{}, + containerRequests: properContainerRequests, + expectErr: true, + }, + { + name: "Single pending pod without annotations should fail", + pods: []v1.Pod{unAnnotatedTestPod}, + containerRequests: properContainerRequests, + expectErr: true, + }, + { + name: "Single pending pod should succeed", + pods: []v1.Pod{properTestPod}, + containerRequests: properContainerRequests, + expectErr: false, + }, + { + name: "Two pending pods without timestamps should fail", + pods: []v1.Pod{properTestPod, properTestPod2}, + containerRequests: properContainerRequests, + expectErr: true, + }, + { + name: "Two pending pods with timestamps should reduce to one candidate and succeed", + pods: []v1.Pod{timeStampedProperTestPod, timeStampedProperTestPod2}, + containerRequests: properContainerRequests, + expectErr: false, + }, + } + + for _, tCase := range testCases { + rm := newMockResourceManager(tCase.pods) + resp, err := rm.ReallocateWithFractionalResources(&v1beta1.AllocateRequest{ + ContainerRequests: tCase.containerRequests, + }) + + if (err != nil) && !tCase.expectErr { + t.Errorf("test %v unexpected failure, err:%v", tCase.name, err) + } + if err == nil { + if tCase.expectErr { + t.Errorf("test %v unexpected success", tCase.name) + } else { + // check response + expectTruef(len(resp.ContainerResponses) == 1, t, tCase.name, "wrong number of container responses, expected 1") + expectTruef(len(resp.ContainerResponses[0].Envs) == 1, t, tCase.name, "wrong number of env variables in container response, expected 1") + expectTruef(resp.ContainerResponses[0].Envs["more"] == "coverage", t, tCase.name, "env not set for container response") + expectTruef(len(resp.ContainerResponses[0].Devices) == 1, t, tCase.name, "wrong number of devices, expected 1") + expectTruef(resp.ContainerResponses[0].Devices[0].HostPath == "hostpath", t, tCase.name, "HostPath not set for device") + expectTruef(resp.ContainerResponses[0].Devices[0].ContainerPath == "containerpath", t, tCase.name, "ContainerPath not set for device") + expectTruef(resp.ContainerResponses[0].Devices[0].Permissions == "rw", t, tCase.name, "permissions not set for device") + } + } + } +} + +func expectTruef(predicate bool, t *testing.T, testName, format string, args ...interface{}) { + if !predicate { + t.Helper() + t.Errorf(fmt.Sprintf("in test %q ", testName)+format, args...) + } +} diff --git a/deployments/gpu_plugin/overlays/fractional_resources/add-args.yaml b/deployments/gpu_plugin/overlays/fractional_resources/add-args.yaml new file mode 100644 index 00000000..c1b5ba7b --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/add-args.yaml @@ -0,0 +1,13 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: intel-gpu-plugin +spec: + template: + spec: + containers: + - name: intel-gpu-plugin + args: + - "-shared-dev-num=300" + - "-enable-monitoring" + - "-resource-manager" diff --git a/deployments/gpu_plugin/overlays/fractional_resources/add-podresource-mount.yaml b/deployments/gpu_plugin/overlays/fractional_resources/add-podresource-mount.yaml new file mode 100644 index 00000000..d127334f --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/add-podresource-mount.yaml @@ -0,0 +1,17 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: intel-gpu-plugin +spec: + template: + spec: + containers: + - name: intel-gpu-plugin + volumeMounts: + - name: podresources + mountPath: /var/lib/kubelet/pod-resources + volumes: + - name: podresources + hostPath: + path: /var/lib/kubelet/pod-resources + \ No newline at end of file diff --git a/deployments/gpu_plugin/overlays/fractional_resources/add-serviceaccount.yaml b/deployments/gpu_plugin/overlays/fractional_resources/add-serviceaccount.yaml new file mode 100644 index 00000000..2926657b --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/add-serviceaccount.yaml @@ -0,0 +1,8 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: intel-gpu-plugin +spec: + template: + spec: + serviceAccountName: resource-reader-sa diff --git a/deployments/gpu_plugin/overlays/fractional_resources/kustomization.yaml b/deployments/gpu_plugin/overlays/fractional_resources/kustomization.yaml new file mode 100644 index 00000000..991ddcd5 --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/kustomization.yaml @@ -0,0 +1,10 @@ +bases: + - ../../base +resources: + - resource-cluster-role-binding.yaml + - resource-cluster-role.yaml + - resource-reader-sa.yaml +patches: + - add-serviceaccount.yaml + - add-podresource-mount.yaml + - add-args.yaml \ No newline at end of file diff --git a/deployments/gpu_plugin/overlays/fractional_resources/resource-cluster-role-binding.yaml b/deployments/gpu_plugin/overlays/fractional_resources/resource-cluster-role-binding.yaml new file mode 100644 index 00000000..f46439f3 --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/resource-cluster-role-binding.yaml @@ -0,0 +1,12 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: resource-reader-rb +subjects: +- kind: ServiceAccount + name: resource-reader-sa + namespace: default +roleRef: + kind: ClusterRole + name: resource-reader + apiGroup: rbac.authorization.k8s.io diff --git a/deployments/gpu_plugin/overlays/fractional_resources/resource-cluster-role.yaml b/deployments/gpu_plugin/overlays/fractional_resources/resource-cluster-role.yaml new file mode 100644 index 00000000..cca48ccc --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/resource-cluster-role.yaml @@ -0,0 +1,8 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: resource-reader +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["list"] diff --git a/deployments/gpu_plugin/overlays/fractional_resources/resource-reader-sa.yaml b/deployments/gpu_plugin/overlays/fractional_resources/resource-reader-sa.yaml new file mode 100644 index 00000000..a2879ece --- /dev/null +++ b/deployments/gpu_plugin/overlays/fractional_resources/resource-reader-sa.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: resource-reader-sa